This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 228304eb058 Fix Huge Number of Watches in ZooKeeper (#17482)
228304eb058 is described below

commit 228304eb058df54ec22a4de028a62eeda92f7f74
Author: Virushade <[email protected]>
AuthorDate: Thu May 8 14:52:13 2025 +0800

    Fix Huge Number of Watches in ZooKeeper (#17482)
    
    
    ---------
    
    Co-authored-by: asdf2014 <[email protected]>
---
 docs/api-reference/tasks-api.md                    |   4 +-
 docs/configuration/index.md                        |   1 +
 .../indexing/worker/WorkerCuratorCoordinator.java  |  12 +-
 .../indexing/worker/WorkerTaskMonitorTest.java     |   3 +
 .../indexing/worker/http/WorkerResourceTest.java   |   3 +
 .../org/apache/druid/curator/CuratorConfig.java    |   8 +
 .../druid/curator/announcement/Announceable.java   |  56 +++
 .../druid/curator/announcement/NodeAnnouncer.java  | 404 +++++++++++++++++++++
 .../{Announcer.java => PathChildrenAnnouncer.java} | 109 +++---
 .../curator/announcement/ServiceAnnouncer.java     |  35 ++
 .../discovery/CuratorDruidNodeAnnouncer.java       |  11 +-
 .../org/apache/druid/guice/AnnouncerModule.java    |  30 +-
 .../guice/annotations/DirectExecutorAnnouncer.java |  34 ++
 .../guice/annotations/SingleThreadedAnnouncer.java |  34 ++
 .../coordination/BatchDataSegmentAnnouncer.java    |   9 +-
 .../CuratorDataSegmentServerAnnouncer.java         |   7 +-
 .../client/BatchServerInventoryViewTest.java       |  24 +-
 .../curator/announcement/NodeAnnouncerTest.java    | 367 +++++++++++++++++++
 ...cerTest.java => PathChildrenAnnouncerTest.java} |  95 ++---
 .../CuratorDruidNodeAnnouncerAndDiscoveryTest.java |   7 +-
 .../BatchDataSegmentAnnouncerTest.java             |  21 +-
 website/.spelling                                  |   2 +
 22 files changed, 1125 insertions(+), 151 deletions(-)

diff --git a/docs/api-reference/tasks-api.md b/docs/api-reference/tasks-api.md
index d94be5b0c5f..69a0a015361 100644
--- a/docs/api-reference/tasks-api.md
+++ b/docs/api-reference/tasks-api.md
@@ -1059,9 +1059,9 @@ Host: http://ROUTER_IP:ROUTER_PORT
     2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
     2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
     2023-07-03T22:11:17,935 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for 
consumer-kafka-supervisor-dcanhmig-1 unregistered
-    2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.announcement.Announcer - Unannouncing 
[/druid/internal-discovery/PEON/localhost:8100]
+    2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing 
[/druid/internal-discovery/PEON/localhost:8100]
     2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self 
[{"druidNode":{"service":"druid/middleManager","host":"localhost","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","serverType":"indexer-executor","prior
 [...]
-    2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.announcement.Announcer - Unannouncing 
[/druid/announcements/localhost:8100]
+    2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing 
[/druid/announcements/localhost:8100]
     2023-07-03T22:11:17,996 INFO [task-runner-0-priority-0] 
org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed 
with status: {
     "id" : "index_kafka_social_media_0e905aa31037879_nommnaeg",
     "status" : "SUCCESS",
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c7ddc73efa0..1a7a1e1516e 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -148,6 +148,7 @@ We recommend just setting the base ZK path and the ZK 
service host, but all ZK p
 |`druid.zk.service.connectionTimeoutMs`|ZooKeeper connection timeout, in 
milliseconds.|`15000`|
 |`druid.zk.service.compress`|Boolean flag for whether or not created Znodes 
should be compressed.|`true`|
 |`druid.zk.service.acl`|Boolean flag for whether or not to enable ACL security 
for ZooKeeper. If ACL is enabled, zNode creators will have all 
permissions.|`false`|
+|`druid.zk.service.pathChildrenCacheStrategy`|Dictates the underlying caching 
strategy for service announcements. Set true to let announcers to use Apache 
Curator's PathChildrenCache strategy, otherwise NodeCache strategy. Consider 
using NodeCache strategy when you are dealing with huge number of ZooKeeper 
watches in your cluster.|`true`|
 
 #### Path configuration
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
index 7e7c09893b9..c018c6a1a63 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
@@ -25,11 +25,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.curator.CuratorUtils;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.DirectExecutorAnnouncer;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -54,7 +54,7 @@ public class WorkerCuratorCoordinator
   private final ObjectMapper jsonMapper;
   private final RemoteTaskRunnerConfig config;
   private final CuratorFramework curatorFramework;
-  private final Announcer announcer;
+  private final ServiceAnnouncer announcer;
 
   private final String baseAnnouncementsPath;
   private final String baseTaskPath;
@@ -69,6 +69,7 @@ public class WorkerCuratorCoordinator
       IndexerZkConfig indexerZkConfig,
       RemoteTaskRunnerConfig config,
       CuratorFramework curatorFramework,
+      @DirectExecutorAnnouncer ServiceAnnouncer announcer,
       Worker worker
   )
   {
@@ -76,8 +77,7 @@ public class WorkerCuratorCoordinator
     this.config = config;
     this.curatorFramework = curatorFramework;
     this.worker = worker;
-
-    this.announcer = new Announcer(curatorFramework, Execs.directExecutor());
+    this.announcer = announcer;
 
     this.baseAnnouncementsPath = 
getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), 
worker.getHost()));
     this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), 
worker.getHost()));
@@ -87,7 +87,7 @@ public class WorkerCuratorCoordinator
   @LifecycleStart
   public void start() throws Exception
   {
-    log.info("WorkerCuratorCoordinator good to go sir. Server[%s]", 
worker.getHost());
+    log.info("WorkerCuratorCoordinator good to go. Server[%s]", 
worker.getHost());
     synchronized (lock) {
       if (started) {
         return;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 6c0462dcf50..79d96fde860 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -28,6 +28,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingCluster;
 import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexing.common.IndexingServiceCondition;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -47,6 +48,7 @@ import 
org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.query.policy.NoopPolicyEnforcer;
 import org.apache.druid.rpc.indexing.NoopOverlordClient;
 import org.apache.druid.rpc.indexing.OverlordClient;
@@ -142,6 +144,7 @@ public class WorkerTaskMonitorTest
         ),
         new TestRemoteTaskRunnerConfig(new Period("PT1S")),
         cf,
+        new NodeAnnouncer(cf, Execs.directExecutor()),
         worker
     );
     workerCuratorCoordinator.start();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
index 0ad900dd8c4..df8e34ab111 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
@@ -26,6 +26,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingCluster;
 import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
 import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
@@ -33,6 +34,7 @@ import org.apache.druid.indexing.worker.WorkerTaskMonitor;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.easymock.EasyMock;
@@ -95,6 +97,7 @@ public class WorkerResourceTest
         }, null, null, null, null),
         new RemoteTaskRunnerConfig(),
         cf,
+        new NodeAnnouncer(cf, Execs.directExecutor()),
         worker
     );
     curatorCoordinator.start();
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java 
b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
index 7a53ee941d7..e98a457b0f3 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
@@ -63,6 +63,9 @@ public class CuratorConfig
   @JsonProperty("maxZkRetries")
   private int maxZkRetries = 29;
 
+  @JsonProperty("pathChildrenCacheStrategy")
+  private boolean pathChildrenCacheStrategy = true;
+
   public static CuratorConfig create(String hosts)
   {
     CuratorConfig config = new CuratorConfig();
@@ -141,4 +144,9 @@ public class CuratorConfig
   {
     return maxZkRetries;
   }
+
+  public boolean getPathChildrenCacheStrategy()
+  {
+    return pathChildrenCacheStrategy;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java 
b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java
new file mode 100644
index 00000000000..cecdb8dbcd5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+/**
+ * The {@link Announceable} is a representation of an announcement to be made 
in ZooKeeper.
+ */
+class Announceable
+{
+  /**
+   * Represents the path in ZooKeeper where the announcement will be made.
+   */
+  final String path;
+
+  /**
+   * Holds the actual data to be announced.
+   */
+  final byte[] bytes;
+
+  /**
+   * Indicates whether parent nodes should be removed if the announcement is 
created successfully.
+   * This can be useful for cleaning up unused paths in ZooKeeper.
+   */
+  final boolean removeParentsIfCreated;
+
+  public Announceable(String path, byte[] bytes, boolean 
removeParentsIfCreated)
+  {
+    this.path = path;
+    this.bytes = bytes;
+    this.removeParentsIfCreated = removeParentsIfCreated;
+  }
+
+  // This should be used for updates only, where removeParentsIfCreated is not 
relevant.
+  public Announceable(String path, byte[] bytes)
+  {
+    // removeParentsIfCreated is irrelevant, so we can use dummy value "false".
+    this(path, bytes, false);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java 
b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java
new file mode 100644
index 00000000000..d38bf79fa38
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link 
PathChildrenAnnouncer} for an announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer implements ServiceAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentHashMap<String, CuratorCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  @Override
+  public void start()
+  {
+    log.debug("Starting Announcer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Announcer has already been started by another thread, 
ignoring start request.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  @Override
+  public void stop()
+  {
+    log.debug("Stopping Announcer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Announcer has already been stopped by another thread, 
ignoring stop request.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing Announcer.", 
parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing Announcer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  @Override
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  @Override
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Announcer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPaths.getPathAndNode(path).getPath();
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.debug(e, "Problem checking if the parent existed, ignoring.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final CuratorCache cache = createCacheForPath(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    boolean created = false;
+    synchronized (toAnnounce) {
+      if (started) {
+        byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes);
+
+        if (oldBytes == null) {
+          created = true;
+        } else if (!Arrays.equals(oldBytes, bytes)) {
+          throw new IAE("Cannot reannounce different values under the same 
path.");
+        }
+      }
+    }
+
+    if (created) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private CuratorCache createCacheForPath(String path)
+  {
+    final CuratorCache cache = CuratorCache.build(curator, path, 
CuratorCache.Options.SINGLE_NODE_CACHE);
+
+    cache.listenable().addListener(
+        (type, oldData, data) -> {
+          if (type == CuratorCacheListener.Type.NODE_DELETED) {
+            final byte[] previouslyAnnouncedData = announcedPaths.get(path);
+            if (previouslyAnnouncedData != null) {
+              try {
+                log.info("ZooKeeper Node[%s] dropped, reinstating...", path);
+                createAnnouncement(path, previouslyAnnouncedData);
+              }
+              catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        }, nodeCacheExecutor
+    );
+
+    return cache;
+  }
+
+  @Override
+  public void update(final String path, final byte[] bytes)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Announcer has not started yet, queuing updates for later 
processing...");
+        toUpdate.add(new Announceable(path, bytes));
+        return;
+      }
+
+      byte[] oldBytes = announcedPaths.get(path);
+
+      if (oldBytes == null) {
+        throw new ISE("Cannot update path[%s] that hasn't been announced!", 
path);
+      }
+
+      try {
+        if (!Arrays.equals(oldBytes, bytes)) {
+          announcedPaths.put(path, bytes);
+          updateAnnouncement(path, bytes);
+        }
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void createAnnouncement(final String path, byte[] value) throws 
Exception
+  {
+    
curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path,
 value);
+  }
+
+  private void updateAnnouncement(final String path, final byte[] value) 
throws Exception
+  {
+    curator.setData().compressed().inBackground().forPath(path, value);
+  }
+
+  /**
+   * Unannounces an announcement created at path.  Note that if all 
announcements get removed, the Announcer
+   * will continue to have ZK watches on paths because clearing them out is a 
source of ugly race conditions.
+   * <p/>
+   * If you need to completely clear all the state of what is being watched 
and announced, stop() the Announcer.
+   *
+   * @param path the path to unannounce
+   */
+  @Override
+  public void unannounce(String path)
+  {
+    synchronized (toAnnounce) {
+      final byte[] value = announcedPaths.remove(path);
+
+      if (value == null) {
+        log.debug("Path[%s] not announced, cannot unannounce.", path);
+        return;
+      }
+    }
+
+    log.info("unannouncing [%s]", path);
+
+    try {
+      CuratorOp deleteOp = curator.transactionOp().delete().forPath(path);
+      curator.transaction().forOperations(deleteOp);
+    }
+    catch (KeeperException.NoNodeException e) {
+      log.info("Unannounced node[%s] that does not exist.", path);
+    }
+    catch (KeeperException.NotEmptyException e) {
+      log.warn("Unannouncing non-empty path[%s]", path);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void startCache(CuratorCache cache)
+  {
+    try {
+      cache.start();
+    }
+    catch (Throwable e) {
+      throw CloseableUtils.closeAndWrapInCatch(e, cache);
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void createPath(String parentPath, boolean removeParentsIfCreated)
+  {
+    try {
+      curator.create().creatingParentsIfNeeded().forPath(parentPath);
+      if (removeParentsIfCreated) {
+        // We keep track of all parents we have built, so we can delete them 
later on when needed.
+        parentsIBuilt.add(parentPath);
+      }
+
+      log.debug("Created parentPath[%s], %s remove on stop.", parentPath, 
removeParentsIfCreated ? "will" : "will not");
+    }
+    catch (KeeperException.NodeExistsException e) {
+      log.info(e, "Problem creating parentPath[%s], someone else created it 
first?", parentPath);
+    }
+    catch (Exception e) {
+      log.error(e, "Unhandled exception when creating parentPath[%s].", 
parentPath);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java 
b/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java
similarity index 78%
rename from 
server/src/main/java/org/apache/druid/curator/announcement/Announcer.java
rename to 
server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java
index 4abd2404524..ec754d96be2 100644
--- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java
+++ 
b/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java
@@ -20,9 +20,10 @@
 package org.apache.druid.curator.announcement;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -37,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -53,20 +53,30 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Announces things on Zookeeper.
+ * The {@link PathChildrenAnnouncer} class manages the announcement of a node, 
and watches all child
+ * and sibling nodes under the specified path in a ZooKeeper ensemble. It 
monitors these nodes
+ * to ensure their existence and manage their lifecycle collectively.
+ *
+ * <p>
+ * This class uses Apache Curator's PathChildrenCache recipe under the hood to 
track all znodes
+ * under the specified node's parent. See {@link NodeAnnouncer} for an 
announcer that
+ * uses the NodeCache recipe instead.
+ * </p>
  */
-public class Announcer
+public class PathChildrenAnnouncer implements ServiceAnnouncer
 {
-  private static final Logger log = new Logger(Announcer.class);
+  private static final Logger log = new Logger(PathChildrenAnnouncer.class);
 
   private final CuratorFramework curator;
   private final PathChildrenCacheFactory factory;
   private final ExecutorService pathChildrenCacheExecutor;
 
+  @GuardedBy("toAnnounce")
   private final List<Announceable> toAnnounce = new ArrayList<>();
+  @GuardedBy("toAnnounce")
   private final List<Announceable> toUpdate = new ArrayList<>();
-  private final ConcurrentMap<String, PathChildrenCache> listeners = new 
ConcurrentHashMap<>();
-  private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> 
announcements = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, PathChildrenCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ConcurrentHashMap<String, byte[]>> 
announcements = new ConcurrentHashMap<>();
   private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
 
   // Used for testing
@@ -74,7 +84,7 @@ public class Announcer
 
   private boolean started = false;
 
-  public Announcer(
+  public PathChildrenAnnouncer(
       CuratorFramework curator,
       ExecutorService exec
   )
@@ -102,11 +112,13 @@ public class Announcer
   }
 
   @LifecycleStart
+  @Override
   public void start()
   {
     log.debug("Starting Announcer.");
     synchronized (toAnnounce) {
       if (started) {
+        log.debug("Announcer has already been started by another thread, 
ignoring start request.");
         return;
       }
 
@@ -125,11 +137,13 @@ public class Announcer
   }
 
   @LifecycleStop
+  @Override
   public void stop()
   {
     log.debug("Stopping Announcer.");
     synchronized (toAnnounce) {
       if (!started) {
+        log.debug("Announcer has already been stopped by another thread, 
ignoring stop request.");
         return;
       }
 
@@ -145,7 +159,7 @@ public class Announcer
         pathChildrenCacheExecutor.shutdown();
       }
 
-      for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry : 
announcements.entrySet()) {
+      for (Map.Entry<String, ConcurrentHashMap<String, byte[]>> entry : 
announcements.entrySet()) {
         String basePath = entry.getKey();
 
         for (String announcementPath : entry.getValue().keySet()) {
@@ -154,45 +168,56 @@ public class Announcer
       }
 
       if (!parentsIBuilt.isEmpty()) {
-        CuratorTransaction transaction = curator.inTransaction();
+        CuratorMultiTransaction transaction = curator.transaction();
+
+        ArrayList<CuratorOp> operations = new ArrayList<>();
         for (String parent : parentsIBuilt) {
           try {
-            transaction = transaction.delete().forPath(parent).and();
+            operations.add(curator.transactionOp().delete().forPath(parent));
           }
           catch (Exception e) {
-            log.info(e, "Unable to delete parent[%s], boooo.", parent);
+            log.info(e, "Unable to delete parent[%s] when closing Announcer.", 
parent);
           }
         }
+
         try {
-          ((CuratorTransactionFinal) transaction).commit();
+          transaction.forOperations(operations);
         }
         catch (Exception e) {
-          log.info(e, "Unable to commit transaction. Please feed the 
hamsters");
+          log.info(e, "Unable to commit transaction when closing Announcer.");
         }
       }
     }
   }
 
   /**
-   * Like announce(path, bytes, true).
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
    */
+  @Override
   public void announce(String path, byte[] bytes)
   {
     announce(path, bytes, true);
   }
 
   /**
-   * Announces the provided bytes at the given path.  Announcement means that 
it will create an ephemeral node
-   * and monitor it to make sure that it always exists until it is unannounced 
or this object is closed.
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link PathChildrenAnnouncer} will create an ephemeral 
znode at the specified path, and uses its parent
+   * path to watch all the siblings and children znodes of your specified 
path. The watched nodes will always exist
+   * until it is unannounced, or until {@link #stop()} is called.
+   * </p>
    *
    * @param path                  The path to announce at
    * @param bytes                 The payload to announce
-   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
    */
+  @Override
   public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
   {
     synchronized (toAnnounce) {
       if (!started) {
+        log.debug("Announcer has not started yet, queuing announcement for 
later processing...");
         toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
         return;
       }
@@ -215,11 +240,7 @@ public class Announcer
         log.debug(e, "Problem checking if the parent existed, ignoring.");
       }
 
-      // I don't have a watcher on this path yet, create a Map and start 
watching.
-      announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>());
-
-      // Guaranteed to be non-null, but might be a map put in there by another 
thread.
-      final ConcurrentMap<String, byte[]> finalSubPaths = 
announcements.get(parentPath);
+      final ConcurrentHashMap<String, byte[]> finalSubPaths = 
announcements.computeIfAbsent(parentPath, key -> new ConcurrentHashMap<>());
 
       // Synchronize to make sure that I only create a listener once.
       synchronized (finalSubPaths) {
@@ -329,12 +350,12 @@ public class Announcer
     }
   }
 
+  @Override
   public void update(final String path, final byte[] bytes)
   {
     synchronized (toAnnounce) {
       if (!started) {
-        // removeParentsIfCreated is not relevant for updates; use dummy value 
"false".
-        toUpdate.add(new Announceable(path, bytes, false));
+        toUpdate.add(new Announceable(path, bytes));
         return;
       }
     }
@@ -347,7 +368,7 @@ public class Announcer
     ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
 
     if (subPaths == null || subPaths.get(nodePath) == null) {
-      throw new ISE("Cannot update a path[%s] that hasn't been announced!", 
path);
+      throw new ISE("Cannot update path[%s] that hasn't been announced!", 
path);
     }
 
     synchronized (toAnnounce) {
@@ -365,14 +386,14 @@ public class Announcer
     }
   }
 
-  private String createAnnouncement(final String path, byte[] value) throws 
Exception
+  private void createAnnouncement(final String path, byte[] value) throws 
Exception
   {
-    return 
curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path,
 value);
+    
curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path,
 value);
   }
 
-  private Stat updateAnnouncement(final String path, final byte[] value) 
throws Exception
+  private void updateAnnouncement(final String path, final byte[] value) 
throws Exception
   {
-    return curator.setData().compressed().inBackground().forPath(path, value);
+    curator.setData().compressed().inBackground().forPath(path, value);
   }
 
   /**
@@ -383,6 +404,7 @@ public class Announcer
    *
    * @param path the path to unannounce
    */
+  @Override
   public void unannounce(String path)
   {
     final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
@@ -397,10 +419,14 @@ public class Announcer
     log.info("Unannouncing [%s]", path);
 
     try {
-      curator.inTransaction().delete().forPath(path).and().commit();
+      CuratorOp deleteOp = curator.transactionOp().delete().forPath(path);
+      curator.transaction().forOperations(deleteOp);
     }
     catch (KeeperException.NoNodeException e) {
-      log.info("Node[%s] didn't exist anyway...", path);
+      log.info("Unannounced node[%s] that does not exist.", path);
+    }
+    catch (KeeperException.NotEmptyException e) {
+      log.warn("Unannouncing non-empty path[%s]", path);
     }
     catch (Exception e) {
       throw new RuntimeException(e);
@@ -426,22 +452,11 @@ public class Announcer
       }
       log.debug("Created parentPath[%s], %s remove on stop.", parentPath, 
removeParentsIfCreated ? "will" : "will not");
     }
-    catch (Exception e) {
+    catch (KeeperException.NodeExistsException e) {
       log.info(e, "Problem creating parentPath[%s], someone else created it 
first?", parentPath);
     }
-  }
-
-  private static class Announceable
-  {
-    final String path;
-    final byte[] bytes;
-    final boolean removeParentsIfCreated;
-
-    public Announceable(String path, byte[] bytes, boolean 
removeParentsIfCreated)
-    {
-      this.path = path;
-      this.bytes = bytes;
-      this.removeParentsIfCreated = removeParentsIfCreated;
+    catch (Exception e) {
+      log.error(e, "Unhandled exception when creating parentPath[%s].", 
parentPath);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/curator/announcement/ServiceAnnouncer.java
 
b/server/src/main/java/org/apache/druid/curator/announcement/ServiceAnnouncer.java
new file mode 100644
index 00000000000..e079e1340e9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/curator/announcement/ServiceAnnouncer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+public interface ServiceAnnouncer
+{
+  void start();
+
+  void stop();
+
+  void announce(String path, byte[] bytes);
+
+  void announce(String path, byte[] bytes, boolean removeParentIfCreated);
+
+  void update(String path, byte[] bytes);
+
+  void unannounce(String path);
+}
diff --git 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
index 5b536c65d39..662706578ba 100644
--- 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
+++ 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
@@ -23,11 +23,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeAnnouncer;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.DruidNode;
@@ -42,12 +43,16 @@ public class CuratorDruidNodeAnnouncer implements 
DruidNodeAnnouncer
 
   private static final Logger log = new 
Logger(CuratorDruidNodeAnnouncer.class);
 
-  private final Announcer announcer;
+  private final ServiceAnnouncer announcer;
   private final ZkPathsConfig config;
   private final ObjectMapper jsonMapper;
 
   @Inject
-  public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, 
@Json ObjectMapper jsonMapper)
+  public CuratorDruidNodeAnnouncer(
+      @SingleThreadedAnnouncer ServiceAnnouncer announcer,
+      ZkPathsConfig config,
+      @Json ObjectMapper jsonMapper
+  )
   {
     this.announcer = announcer;
     this.config = config;
diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java 
b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
index 6b0c96641a6..b6a4d472283 100644
--- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
+++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
@@ -24,8 +24,13 @@ import com.google.inject.Inject;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.druid.curator.CuratorConfig;
 import org.apache.druid.curator.ZkEnablementConfig;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
+import org.apache.druid.curator.announcement.PathChildrenAnnouncer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.DirectExecutorAnnouncer;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
 import org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
@@ -64,9 +69,28 @@ public class AnnouncerModule implements Module
   }
 
   @Provides
+  @SingleThreadedAnnouncer
   @ManageLifecycleAnnouncements
-  public Announcer getAnnouncer(CuratorFramework curator)
+  public ServiceAnnouncer 
getAnnouncerWithSingleThreadedExecutorService(CuratorFramework curator, 
CuratorConfig config)
   {
-    return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));
+    boolean usingPathChildrenCacheAnnouncer = 
config.getPathChildrenCacheStrategy();
+    if (usingPathChildrenCacheAnnouncer) {
+      return new PathChildrenAnnouncer(curator, 
Execs.singleThreaded("Announcer-%s"));
+    } else {
+      return new NodeAnnouncer(curator, Execs.singleThreaded("Announcer-%s"));
+    }
+  }
+
+  @Provides
+  @DirectExecutorAnnouncer
+  @ManageLifecycleAnnouncements
+  public ServiceAnnouncer 
getAnnouncerWithDirectExecutorService(CuratorFramework curator, CuratorConfig 
config)
+  {
+    boolean usingPathChildrenCacheAnnouncer = 
config.getPathChildrenCacheStrategy();
+    if (usingPathChildrenCacheAnnouncer) {
+      return new PathChildrenAnnouncer(curator, Execs.directExecutor());
+    } else {
+      return new NodeAnnouncer(curator, Execs.directExecutor());
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java
 
b/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java
new file mode 100644
index 00000000000..0d675469222
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DirectExecutorAnnouncer
+{
+}
diff --git 
a/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java
 
b/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java
new file mode 100644
index 00000000000..8f815301ecf
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SingleThreadedAnnouncer
+{
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
 
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index 7fff124ac81..ad721f4ef7c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -32,7 +32,8 @@ import com.google.inject.Provider;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.common.utils.UUIDUtils;
 import org.apache.druid.curator.ZkEnablementConfig;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -64,7 +65,7 @@ public class BatchDataSegmentAnnouncer implements 
DataSegmentAnnouncer
   private final BatchDataSegmentAnnouncerConfig config;
 
   @Nullable //Null if zk is disabled or isSkipSegmentAnnouncementOnZk = true
-  private final Announcer announcer;
+  private final ServiceAnnouncer announcer;
 
   private final ObjectMapper jsonMapper;
   private final String liveSegmentLocation;
@@ -91,7 +92,7 @@ public class BatchDataSegmentAnnouncer implements 
DataSegmentAnnouncer
       DruidServerMetadata server,
       final BatchDataSegmentAnnouncerConfig config,
       ZkPathsConfig zkPaths,
-      Provider<Announcer> announcerProvider,
+      @SingleThreadedAnnouncer Provider<ServiceAnnouncer> announcerProvider,
       ObjectMapper jsonMapper,
       ZkEnablementConfig zkEnablementConfig
   )
@@ -127,7 +128,7 @@ public class BatchDataSegmentAnnouncer implements 
DataSegmentAnnouncer
       DruidServerMetadata server,
       final BatchDataSegmentAnnouncerConfig config,
       ZkPathsConfig zkPaths,
-      Announcer announcer,
+      ServiceAnnouncer announcer,
       ObjectMapper jsonMapper
   )
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
 
b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
index 1f456bd3445..ec029ba6c9a 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 
@@ -37,7 +38,7 @@ public class CuratorDataSegmentServerAnnouncer implements 
DataSegmentServerAnnou
 
   private final DruidServerMetadata server;
   private final ZkPathsConfig config;
-  private final Announcer announcer;
+  private final ServiceAnnouncer announcer;
   private final ObjectMapper jsonMapper;
 
   private final Object lock = new Object();
@@ -48,7 +49,7 @@ public class CuratorDataSegmentServerAnnouncer implements 
DataSegmentServerAnnou
   public CuratorDataSegmentServerAnnouncer(
       DruidServerMetadata server,
       ZkPathsConfig config,
-      Announcer announcer,
+      @SingleThreadedAnnouncer ServiceAnnouncer announcer,
       ObjectMapper jsonMapper
   )
   {
diff --git 
a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
 
b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
index 86ee872dafe..f5ed9ba924e 100644
--- 
a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
@@ -38,7 +38,7 @@ import org.apache.druid.client.BatchServerInventoryView;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ServerView;
 import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -88,7 +88,7 @@ public class BatchServerInventoryViewTest
   private TestingCluster testingCluster;
   private CuratorFramework cf;
   private ObjectMapper jsonMapper;
-  private Announcer announcer;
+  private NodeAnnouncer nodeAnnouncer;
   private BatchDataSegmentAnnouncer segmentAnnouncer;
   private DataSegmentServerAnnouncer serverAnnouncer;
   private Set<DataSegment> testSegments;
@@ -116,11 +116,8 @@ public class BatchServerInventoryViewTest
 
     jsonMapper = TestHelper.makeJsonMapper();
 
-    announcer = new Announcer(
-        cf,
-        Execs.directExecutor()
-    );
-    announcer.start();
+    nodeAnnouncer = new NodeAnnouncer(cf, Execs.directExecutor());
+    nodeAnnouncer.start();
 
     DruidServerMetadata serverMetadata = new DruidServerMetadata(
         "id",
@@ -144,7 +141,7 @@ public class BatchServerInventoryViewTest
     serverAnnouncer = new CuratorDataSegmentServerAnnouncer(
         serverMetadata,
         zkPathsConfig,
-        announcer,
+        nodeAnnouncer,
         jsonMapper
     );
     serverAnnouncer.announce();
@@ -160,7 +157,7 @@ public class BatchServerInventoryViewTest
           }
         },
         zkPathsConfig,
-        announcer,
+        nodeAnnouncer,
         jsonMapper
     );
 
@@ -225,7 +222,7 @@ public class BatchServerInventoryViewTest
     batchServerInventoryView.stop();
     filteredBatchServerInventoryView.stop();
     serverAnnouncer.unannounce();
-    announcer.stop();
+    nodeAnnouncer.stop();
     cf.close();
     testingCluster.stop();
   }
@@ -425,7 +422,10 @@ public class BatchServerInventoryViewTest
   public void testSameTimeZnode() throws Exception
   {
     final int numThreads = INITIAL_SEGMENTS / 10;
-    final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, 
"BatchServerInventoryViewTest-%d"));
+    final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(
+        numThreads,
+        "BatchServerInventoryViewTest-%d"
+    ));
 
     segmentAnnouncer.announceSegments(testSegments);
 
@@ -474,7 +474,7 @@ public class BatchServerInventoryViewTest
                           return TEST_BASE_PATH;
                         }
                       },
-                      announcer,
+                      nodeAnnouncer,
                       jsonMapper
                   );
                   List<DataSegment> segments = new ArrayList<>();
diff --git 
a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
 
b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
new file mode 100644
index 00000000000..6841f87b314
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+public class NodeAnnouncerTest extends CuratorTestBase
+{
+  private ExecutorService exec;
+
+  @BeforeEach
+  public void setUp() throws Exception
+  {
+    setupServerAndCurator();
+    exec = Execs.singleThreaded("test-node-announcer-sanity-%s");
+    curator.start();
+    curator.blockUntilConnected();
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    tearDownServerAndCurator();
+  }
+
+  @Test
+  @Timeout(60_000)
+  public void testCreateParentPath() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/newParent/testPath";
+    final String parentPath = ZKPaths.getPathAndNode(testPath).getPath();
+
+    announcer.start();
+    Assertions.assertNull(curator.checkExists().forPath(parentPath), "Parent 
path should not exist before announcement");
+    announcer.announce(testPath, billy);
+
+    // Wait for the announcement to be processed
+    while (curator.checkExists().forPath(testPath) == null) {
+      Thread.sleep(100);
+    }
+
+    Assertions.assertNotNull(curator.checkExists().forPath(parentPath), 
"Parent path should be created");
+    Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  @Timeout(60_000)
+  public void testAnnounceSamePathWithDifferentPayloadThrowsIAE() throws 
Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final byte[] tilly = StringUtils.toUtf8("tilly");
+    final String testPath = "/testPath";
+
+    announcer.start();
+    announcer.announce(testPath, billy);
+    while (curator.checkExists().forPath(testPath) == null) {
+      Thread.sleep(100);
+    }
+    Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+
+    // Nothing wrong when we announce same payload on the same path.
+    announcer.announce(testPath, billy);
+
+    // Expect an exception when announcing a different payload
+    IAE exception = Assertions.assertThrows(IAE.class, () -> 
announcer.announce(testPath, tilly));
+    Assertions.assertEquals("Cannot reannounce different values under the same 
path.", exception.getMessage());
+
+    // Confirm that the announcement remains unchanged.
+    Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  public void testUpdateBeforeStartingNodeAnnouncer() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final byte[] tilly = StringUtils.toUtf8("tilly");
+    final String testPath = "/testAnnounce";
+
+    // Queue update before the announcer is started
+    announcer.update(testPath, tilly);
+    announcer.announce(testPath, billy);
+    announcer.start();
+
+    // Verify that the update took precedence
+    Assertions.assertArrayEquals(tilly, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  public void testUpdateSuccessfully() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final byte[] tilly = StringUtils.toUtf8("tilly");
+    final String testPath = "/testUpdate";
+
+    announcer.start();
+    announcer.announce(testPath, billy);
+    Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+
+    // Update with the same payload: nothing should change.
+    announcer.update(testPath, billy);
+    Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+
+    // Update with a new payload.
+    announcer.update(testPath, tilly);
+    Assertions.assertArrayEquals(tilly, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  public void testUpdateNonExistentPath()
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/testUpdate";
+
+    announcer.start();
+
+    ISE exception = Assertions.assertThrows(ISE.class, () -> 
announcer.update(testPath, billy));
+    Assertions.assertEquals("Cannot update path[/testUpdate] that hasn't been 
announced!", exception.getMessage());
+    announcer.stop();
+  }
+
+  @Test
+  @Timeout(60_000)
+  public void testSanity() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath1 = "/test1";
+    final String testPath2 = "/somewhere/test2";
+    announcer.announce(testPath1, billy);
+
+    Assertions.assertNull(curator.checkExists().forPath(testPath1), "/test1 
does not exist before announcer start");
+    Assertions.assertNull(curator.checkExists().forPath(testPath2), 
"/somewhere/test2 does not exist before announcer start");
+
+    announcer.start();
+    while (!announcer.getAddedPaths().contains("/test1")) {
+      Thread.sleep(100);
+    }
+
+    try {
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath1), "/test1 has data");
+      Assertions.assertNull(curator.checkExists().forPath(testPath2), 
"/somewhere/test2 still does not exist");
+
+      announcer.announce(testPath2, billy);
+
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath1), "/test1 still has data");
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath2), "/somewhere/test2 has 
data");
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      curator.getCuratorListenable().addListener((client, event) -> {
+        if (event.getType() == CuratorEventType.CREATE && 
event.getPath().equals(testPath1)) {
+          latch.countDown();
+        }
+      });
+      final CuratorOp deleteOp = 
curator.transactionOp().delete().forPath(testPath1);
+      final Collection<CuratorTransactionResult> results = 
curator.transaction().forOperations(deleteOp);
+      Assertions.assertEquals(1, results.size(), "Expected one result from the 
delete op");
+      final CuratorTransactionResult result = results.iterator().next();
+      Assertions.assertEquals(Code.OK.intValue(), result.getError(), "Expected 
OK code on delete");
+
+      Assertions.assertTrue(timing.forWaiting().awaitLatch(latch), "Wait for 
/test1 to be recreated");
+
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath1), "Expected /test1 data to 
be restored");
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath2), "Expected /somewhere/test2 
data to remain");
+
+      announcer.unannounce(testPath1);
+      Assertions.assertNull(curator.checkExists().forPath(testPath1), 
"Expected /test1 to be unannounced");
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath2), "Expected /somewhere/test2 
to remain");
+    }
+    finally {
+      announcer.stop();
+    }
+
+    Assertions.assertNull(curator.checkExists().forPath(testPath1), "Expected 
/test1 to remain unannounced");
+    Assertions.assertNull(curator.checkExists().forPath(testPath2), "Expected 
/somewhere/test2 to be unannounced");
+  }
+
+  @Test
+  @Timeout(60_000)
+  public void testSessionKilled() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    try {
+      CuratorOp createOp = 
curator.transactionOp().create().forPath("/somewhere");
+      curator.transaction().forOperations(createOp);
+      announcer.start();
+
+      final byte[] billy = StringUtils.toUtf8("billy");
+      final String testPath1 = "/test1";
+      final String testPath2 = "/somewhere/test2";
+      final String[] paths = new String[]{testPath1, testPath2};
+      announcer.announce(testPath1, billy);
+      announcer.announce(testPath2, billy);
+
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath1));
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath2));
+
+      final CountDownLatch latch = createCountdownLatchForPaths(paths);
+      KillSession.kill(curator.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
+
+      Assertions.assertTrue(timing.forWaiting().awaitLatch(latch), "Await 
latch after killing session");
+
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath1));
+      Assertions.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath2));
+
+      announcer.stop();
+
+      while ((curator.checkExists().forPath(testPath1) != null) ||
+              (curator.checkExists().forPath(testPath2) != null)) {
+        Thread.sleep(100);
+      }
+
+      Assertions.assertNull(curator.checkExists().forPath(testPath1));
+      Assertions.assertNull(curator.checkExists().forPath(testPath2));
+    }
+    finally {
+      announcer.stop();
+    }
+  }
+
+  @Test
+  public void testRemovesParentIfCreated() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/somewhere/test";
+    final String parent = ZKPaths.getPathAndNode(testPath).getPath();
+
+    announcer.start();
+    try {
+      Assertions.assertNull(curator.checkExists().forPath(parent));
+
+      awaitAnnounce(announcer, testPath, billy, true);
+
+      Assertions.assertNotNull(curator.checkExists().forPath(parent));
+    }
+    finally {
+      announcer.stop();
+    }
+
+    Assertions.assertNull(curator.checkExists().forPath(parent));
+  }
+
+  @Test
+  public void testLeavesBehindParentPathIfAlreadyExists() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/somewhere/test2";
+    final String parent = ZKPaths.getPathAndNode(testPath).getPath();
+
+    curator.create().forPath(parent);
+    final Stat initialStat = curator.checkExists().forPath(parent);
+
+    announcer.start();
+    try {
+      Assertions.assertEquals(initialStat.getMzxid(), 
curator.checkExists().forPath(parent).getMzxid());
+
+      awaitAnnounce(announcer, testPath, billy, true);
+
+      Assertions.assertEquals(initialStat.getMzxid(), 
curator.checkExists().forPath(parent).getMzxid());
+    }
+    finally {
+      announcer.stop();
+    }
+
+    Assertions.assertEquals(initialStat.getMzxid(), 
curator.checkExists().forPath(parent).getMzxid());
+  }
+
+  @Test
+  public void testLeavesParentPathsUntouchedWhenInstructed() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/somewhere/test2";
+    final String parent = ZKPaths.getPathAndNode(testPath).getPath();
+
+    announcer.start();
+    try {
+      Assertions.assertNull(curator.checkExists().forPath(parent));
+
+      awaitAnnounce(announcer, testPath, billy, false);
+
+      Assertions.assertNotNull(curator.checkExists().forPath(parent));
+    }
+    finally {
+      announcer.stop();
+    }
+
+    Assertions.assertNotNull(curator.checkExists().forPath(parent));
+  }
+
+  private void awaitAnnounce(
+          final NodeAnnouncer announcer,
+          final String path,
+          final byte[] bytes,
+          boolean removeParentsIfCreated
+  ) throws InterruptedException
+  {
+    final CountDownLatch latch = createCountdownLatchForPaths(path);
+    announcer.announce(path, bytes, removeParentsIfCreated);
+    latch.await();
+  }
+
+  private CountDownLatch createCountdownLatchForPaths(String... paths)
+  {
+    final CountDownLatch latch = new CountDownLatch(paths.length);
+    curator.getCuratorListenable().addListener((client, event) -> {
+      if (event.getType() == CuratorEventType.CREATE && 
Arrays.asList(paths).contains(event.getPath())) {
+        latch.countDown();
+      }
+    });
+
+    return latch;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java 
b/server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java
similarity index 78%
rename from 
server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java
rename to 
server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java
index e12d8bc47bb..095a38f69ce 100644
--- 
a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java
+++ 
b/server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.curator.announcement;
 
-import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
@@ -39,16 +38,17 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 
 /**
+ *
  */
-public class AnnouncerTest extends CuratorTestBase
+public class PathChildrenAnnouncerTest extends CuratorTestBase
 {
-  private static final Logger log = new Logger(AnnouncerTest.class);
+  private static final Logger log = new 
Logger(PathChildrenAnnouncerTest.class);
   private ExecutorService exec;
 
   @Before
@@ -56,6 +56,8 @@ public class AnnouncerTest extends CuratorTestBase
   {
     setupServerAndCurator();
     exec = Execs.singleThreaded("test-announcer-sanity-%s");
+    curator.start();
+    curator.blockUntilConnected();
   }
 
   @After
@@ -67,9 +69,7 @@ public class AnnouncerTest extends CuratorTestBase
   @Test(timeout = 60_000L)
   public void testSanity() throws Exception
   {
-    curator.start();
-    curator.blockUntilConnected();
-    Announcer announcer = new Announcer(curator, exec);
+    PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
     announcer.initializeAddedChildren();
 
     final byte[] billy = StringUtils.toUtf8("billy");
@@ -98,19 +98,8 @@ public class AnnouncerTest extends CuratorTestBase
           curator.getData().decompressed().forPath(testPath2)
       );
 
-      final CountDownLatch latch = new CountDownLatch(1);
-      curator.getCuratorListenable().addListener(
-          new CuratorListener()
-          {
-            @Override
-            public void eventReceived(CuratorFramework client, CuratorEvent 
event)
-            {
-              if (event.getType() == CuratorEventType.CREATE && 
event.getPath().equals(testPath1)) {
-                latch.countDown();
-              }
-            }
-          }
-      );
+      final CountDownLatch latch = createCountdownLatchForPaths(testPath1);
+
       final CuratorOp deleteOp = 
curator.transactionOp().delete().forPath(testPath1);
       final Collection<CuratorTransactionResult> results = 
curator.transaction().forOperations(deleteOp);
       Assert.assertEquals(1, results.size());
@@ -149,39 +138,24 @@ public class AnnouncerTest extends CuratorTestBase
   @Test(timeout = 60_000L)
   public void testSessionKilled() throws Exception
   {
-    curator.start();
-    curator.blockUntilConnected();
-    Announcer announcer = new Announcer(curator, exec);
+    PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
     try {
-      curator.inTransaction().create().forPath("/somewhere").and().commit();
+      CuratorOp createOp = 
curator.transactionOp().create().forPath("/somewhere");
+      curator.transaction().forOperations(createOp);
       announcer.start();
 
       final byte[] billy = StringUtils.toUtf8("billy");
       final String testPath1 = "/test1";
       final String testPath2 = "/somewhere/test2";
-      final Set<String> paths = Sets.newHashSet(testPath1, testPath2);
+      final String[] paths = new String[]{testPath1, testPath2};
       announcer.announce(testPath1, billy);
       announcer.announce(testPath2, billy);
 
       Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath1));
       Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath2));
 
-      final CountDownLatch latch = new CountDownLatch(1);
-      curator.getCuratorListenable().addListener(
-          new CuratorListener()
-          {
-            @Override
-            public void eventReceived(CuratorFramework client, CuratorEvent 
event)
-            {
-              if (event.getType() == CuratorEventType.CREATE) {
-                paths.remove(event.getPath());
-                if (paths.isEmpty()) {
-                  latch.countDown();
-                }
-              }
-            }
-          }
-      );
+      final CountDownLatch latch = createCountdownLatchForPaths(paths);
+
       KillSession.kill(curator.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
 
       Assert.assertTrue(timing.forWaiting().awaitLatch(latch));
@@ -204,11 +178,9 @@ public class AnnouncerTest extends CuratorTestBase
   }
 
   @Test
-  public void testCleansUpItsLittleTurdlings() throws Exception
+  public void testRemovesParentIfCreated() throws Exception
   {
-    curator.start();
-    curator.blockUntilConnected();
-    Announcer announcer = new Announcer(curator, exec);
+    PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
 
     final byte[] billy = StringUtils.toUtf8("billy");
     final String testPath = "/somewhere/test2";
@@ -230,14 +202,12 @@ public class AnnouncerTest extends CuratorTestBase
   }
 
   @Test
-  public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception
+  public void testLeavesBehindParentPathIfAlreadyExists() throws Exception
   {
-    curator.start();
-    curator.blockUntilConnected();
-    Announcer announcer = new Announcer(curator, exec);
+    PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
 
     final byte[] billy = StringUtils.toUtf8("billy");
-    final String testPath = "/somewhere/test2";
+    final String testPath = "/somewhere/test";
     final String parent = ZKPaths.getPathAndNode(testPath).getPath();
 
     curator.create().forPath(parent);
@@ -259,14 +229,12 @@ public class AnnouncerTest extends CuratorTestBase
   }
 
   @Test
-  public void testLeavesBehindTurdlingsWhenToldTo() throws Exception
+  public void testLeavesParentPathsUntouchedWhenInstructed() throws Exception
   {
-    curator.start();
-    curator.blockUntilConnected();
-    Announcer announcer = new Announcer(curator, exec);
+    PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
 
     final byte[] billy = StringUtils.toUtf8("billy");
-    final String testPath = "/somewhere/test2";
+    final String testPath = "/somewhere/test";
     final String parent = ZKPaths.getPathAndNode(testPath).getPath();
 
     announcer.start();
@@ -285,26 +253,33 @@ public class AnnouncerTest extends CuratorTestBase
   }
 
   private void awaitAnnounce(
-      final Announcer announcer,
+      final PathChildrenAnnouncer announcer,
       final String path,
       final byte[] bytes,
       boolean removeParentsIfCreated
   ) throws InterruptedException
   {
-    final CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch latch = createCountdownLatchForPaths(path);
+    announcer.announce(path, bytes, removeParentsIfCreated);
+    latch.await();
+  }
+
+  private CountDownLatch createCountdownLatchForPaths(String... path)
+  {
+    final CountDownLatch latch = new CountDownLatch(path.length);
     curator.getCuratorListenable().addListener(
         new CuratorListener()
         {
           @Override
           public void eventReceived(CuratorFramework client, CuratorEvent 
event)
           {
-            if (event.getType() == CuratorEventType.CREATE && 
event.getPath().equals(path)) {
+            if (event.getType() == CuratorEventType.CREATE && 
Arrays.asList(path).contains(event.getPath())) {
               latch.countDown();
             }
           }
         }
     );
-    announcer.announce(path, bytes, removeParentsIfCreated);
-    latch.await();
+
+    return latch;
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
 
b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
index 93466876e6e..7d9368a8e1e 100644
--- 
a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
+++ 
b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.NodeRole;
@@ -70,10 +70,7 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest 
extends CuratorTestBase
     curator.start();
     curator.blockUntilConnected();
 
-    Announcer announcer = new Announcer(
-        curator,
-        Execs.directExecutor()
-    );
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, 
Execs.directExecutor());
     announcer.start();
 
     CuratorDruidNodeAnnouncer druidNodeAnnouncer = new 
CuratorDruidNodeAnnouncer(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
index 98644cdc565..c3e04f04d6d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
@@ -31,7 +31,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingCluster;
 import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.segment.TestHelper;
@@ -284,7 +284,10 @@ public class BatchDataSegmentAnnouncerTest
     List<String> zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH);
 
     for (String zNode : zNodes) {
-      DataSegment announcedSegment = 
Iterables.getOnlyElement(segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, 
zNode)));
+      DataSegment announcedSegment = 
Iterables.getOnlyElement(segmentReader.read(JOINER.join(
+          TEST_SEGMENTS_PATH,
+          zNode
+      )));
       Assert.assertEquals(announcedSegment, firstSegment);
       Assert.assertTrue(announcedSegment.getDimensions().isEmpty());
       Assert.assertTrue(announcedSegment.getMetrics().isEmpty());
@@ -307,7 +310,10 @@ public class BatchDataSegmentAnnouncerTest
     List<String> zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH);
 
     for (String zNode : zNodes) {
-      DataSegment announcedSegment = 
Iterables.getOnlyElement(segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, 
zNode)));
+      DataSegment announcedSegment = 
Iterables.getOnlyElement(segmentReader.read(JOINER.join(
+          TEST_SEGMENTS_PATH,
+          zNode
+      )));
       Assert.assertEquals(announcedSegment, firstSegment);
       Assert.assertNull(announcedSegment.getLoadSpec());
     }
@@ -402,7 +408,8 @@ public class BatchDataSegmentAnnouncerTest
     segmentAnnouncer.announceSegmentSchemas(
         taskId,
         new SegmentSchemas(Collections.singletonList(absoluteSchema1)),
-        new SegmentSchemas(Collections.singletonList(absoluteSchema1)));
+        new SegmentSchemas(Collections.singletonList(absoluteSchema1))
+    );
 
     ChangeRequestsSnapshot<DataSegmentChangeRequest> snapshot;
 
@@ -618,7 +625,7 @@ public class BatchDataSegmentAnnouncerTest
     }
   }
 
-  private static class TestAnnouncer extends Announcer
+  private static class TestAnnouncer extends NodeAnnouncer
   {
     private final ConcurrentHashMap<String, ConcurrentHashMap<byte[], 
AtomicInteger>> numPathAnnounced = new ConcurrentHashMap<>();
 
@@ -630,7 +637,9 @@ public class BatchDataSegmentAnnouncerTest
     @Override
     public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
     {
-      numPathAnnounced.computeIfAbsent(path, k -> new 
ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new 
AtomicInteger(0)).incrementAndGet();
+      numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>())
+                      .computeIfAbsent(bytes, k -> new AtomicInteger(0))
+                      .incrementAndGet();
       super.announce(path, bytes, removeParentIfCreated);
     }
   }
diff --git a/website/.spelling b/website/.spelling
index 39de8156d95..62e3d2bec31 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -198,6 +198,7 @@ Murmur3
 MVCC
 MV_TO_ARRAY
 NFS
+NodeCache
 OCF
 OIDC
 OLAP
@@ -209,6 +210,7 @@ OutputStream
 ParAccel
 ParseSpec
 ParseSpecs
+PathChildrenCache
 Protobuf
 protobuf
 pull-deps


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to