kgyrtkirk commented on code in PR #17482:
URL: https://github.com/apache/druid/pull/17482#discussion_r1971330840


##########
server/src/main/java/org/apache/druid/curator/announcement/Announcer.java:
##########
@@ -212,13 +234,13 @@ public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
         }
       }
       catch (Exception e) {
-        log.debug(e, "Problem checking if the parent existed, ignoring.");
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
       }
 
       // I don't have a watcher on this path yet, create a Map and start 
watching.
       announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>());
 
-      // Guaranteed to be non-null, but might be a map put in there by another 
thread.
+      // Guaranteed to be non-null, but might be a map put in here by another 
thread.

Review Comment:
   note: this seems like the usecase of `computeIfAbsent`



##########
server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.test.KillSession;
+import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+/**
+ *
+ */
+public class NodeAnnouncerTest extends CuratorTestBase
+{
+  private static final Logger log = new Logger(NodeAnnouncerTest.class);
+  private ExecutorService exec;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    setupServerAndCurator();
+    exec = Execs.singleThreaded("test-node-announcer-sanity-%s");
+    curator.start();
+    curator.blockUntilConnected();
+  }
+
+  @After
+  public void tearDown()
+  {
+    tearDownServerAndCurator();
+  }
+
+  @Test(timeout = 60_000L)
+  public void testCreateParentPath() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/newParent/testPath";
+    final String parentPath = ZKPathsUtils.getParentPath(testPath);
+
+    announcer.start();
+    Assert.assertNull("Parent path should not exist before announcement", 
curator.checkExists().forPath(parentPath));
+    announcer.announce(testPath, billy);
+
+    // Wait for the announcement to be processed
+    while (curator.checkExists().forPath(testPath) == null) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertNotNull("Parent path should be created", 
curator.checkExists().forPath(parentPath));
+    Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test(timeout = 60_000L)
+  public void testAnnounceSamePathWithDifferentPayloadThrowsIAE() throws 
Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final byte[] tilly = StringUtils.toUtf8("tilly");
+    final String testPath = "/testPath";
+
+    announcer.start();
+    announcer.announce(testPath, billy);
+    while (curator.checkExists().forPath(testPath) == null) {
+      Thread.sleep(100);
+    }
+    Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+
+    // Nothing wrong when we announce same path.
+    announcer.announce(testPath, billy);
+
+    // Something wrong when we announce different path.
+    Exception exception = Assert.assertThrows(IAE.class, () -> 
announcer.announce(testPath, tilly));
+    Assert.assertEquals(exception.getMessage(), "Cannot reannounce different 
values under the same path.");
+
+    // Confirm that the new announcement is invalidated, and we still have 
payload from previous announcement.
+    Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  public void testUpdateBeforeStartingNodeAnnouncer() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final byte[] tilly = StringUtils.toUtf8("tilly");
+    final String testPath = "/testAnnounce";
+
+    announcer.update(testPath, tilly);
+    announcer.announce(testPath, billy);
+    announcer.start();
+
+    // Verify that the path was announced
+    Assert.assertArrayEquals(tilly, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  public void testUpdateSuccessfully() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final byte[] tilly = StringUtils.toUtf8("tilly");
+    final String testPath = "/testUpdate";
+
+    announcer.start();
+    announcer.announce(testPath, billy);
+    Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+
+    announcer.update(testPath, billy);
+    Assert.assertArrayEquals(billy, 
curator.getData().decompressed().forPath(testPath));
+
+    announcer.update(testPath, tilly);
+    Assert.assertArrayEquals(tilly, 
curator.getData().decompressed().forPath(testPath));
+    announcer.stop();
+  }
+
+  @Test
+  public void testUpdateNonExistentPath()
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath = "/testUpdate";
+
+    announcer.start();
+
+    Exception exception = Assert.assertThrows(ISE.class, () -> 
announcer.update(testPath, billy));
+    Assert.assertEquals(exception.getMessage(), "Cannot update 
path[/testUpdate] that hasn't been announced!");
+    announcer.stop();
+  }
+
+  @Test(timeout = 60_000L)
+  public void testSanity() throws Exception
+  {
+    NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+    final byte[] billy = StringUtils.toUtf8("billy");
+    final String testPath1 = "/test1";
+    final String testPath2 = "/somewhere/test2";
+    announcer.announce(testPath1, billy);
+
+    Assert.assertNull("/test1 does not exists", 
curator.checkExists().forPath(testPath1));
+    Assert.assertNull("/somewhere/test2 does not exists", 
curator.checkExists().forPath(testPath2));
+
+    announcer.start();
+    while (!announcer.getAddedPaths().contains("/test1")) {
+      Thread.sleep(100);

Review Comment:
   possibly infinite sleep; please add `@Timeout` to the test



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, 
bytes);
+
+    if (readyToCreateAnnouncement) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private NodeCache setupNodeCache(String path)
+  {
+    final NodeCache cache = new NodeCache(curator, path, true);
+    cache.getListenable().addListener(
+        () -> nodeCacheExecutor.submit(() -> {
+          ChildData currentData = cache.getCurrentData();
+
+          if (currentData == null) {
+            // If currentData is null, and we know we have already announced 
the data,
+            // this means that the ephemeral node was unexpectedly removed.
+            // We will recreate the node again using the previous data.
+            final byte[] previouslyAnnouncedData = announcedPaths.get(path);
+            if (previouslyAnnouncedData != null) {
+              log.info("Node[%s] dropped, reinstating.", path);
+              try {
+                createAnnouncement(path, previouslyAnnouncedData);
+              }
+              catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        })
+    );
+    return cache;
+  }
+
+  private boolean updateAnnouncedPaths(String path, byte[] bytes)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        return false; // Do nothing if not started

Review Comment:
   could the following lead to lost events:
   * L212 `started` is true
   * `stop()` is called -> sets started to false
   * L293 return false
   
   is this a valid usecase? can an announcer started/stopped multiple times?
   (I think not) ; so it might have been better to just throw an error here; or 
remove it entirely



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, 
bytes);
+
+    if (readyToCreateAnnouncement) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private NodeCache setupNodeCache(String path)
+  {
+    final NodeCache cache = new NodeCache(curator, path, true);
+    cache.getListenable().addListener(
+        () -> nodeCacheExecutor.submit(() -> {
+          ChildData currentData = cache.getCurrentData();
+
+          if (currentData == null) {
+            // If currentData is null, and we know we have already announced 
the data,
+            // this means that the ephemeral node was unexpectedly removed.
+            // We will recreate the node again using the previous data.
+            final byte[] previouslyAnnouncedData = announcedPaths.get(path);
+            if (previouslyAnnouncedData != null) {
+              log.info("Node[%s] dropped, reinstating.", path);
+              try {
+                createAnnouncement(path, previouslyAnnouncedData);
+              }
+              catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        })
+    );
+    return cache;
+  }
+
+  private boolean updateAnnouncedPaths(String path, byte[] bytes)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        return false; // Do nothing if not started
+      }
+    }
+
+    final byte[] updatedAnnouncementData = announcedPaths.compute(path, (key, 
oldBytes) -> {
+      if (oldBytes == null) {
+        return bytes; // Insert the new value
+      } else if (!Arrays.equals(oldBytes, bytes)) {
+        throw new IAE("Cannot reannounce different values under the same 
path.");
+      }
+      return oldBytes; // No change if values are equal
+    });
+
+    // Return true if we have updated the paths.
+    return Arrays.equals(updatedAnnouncementData, bytes);

Review Comment:
   note: this seems like an exception or `true` 
   ```
   currentBytes =computeIfAbsent(path, () -> bytes)
   if(Arrays.equals(oldBytes, bytes)) {
     return;
   }
   throw ...
   ```



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }

Review Comment:
   can you pack this into a method like `ensureParentPathExists` or something? 
   
   I find the `Exception` handling pretty strange here and in the  `createPath` 
method
   don't really understand why continue if 
`curator.checkExists().forPath(parentPath)` returns some exception ?



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, 
bytes);
+
+    if (readyToCreateAnnouncement) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private NodeCache setupNodeCache(String path)
+  {
+    final NodeCache cache = new NodeCache(curator, path, true);

Review Comment:
   `NodeCache` seems to be 
[deprecated](https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/cache/NodeCache.html)
 ; could we start using a non-deprecated api - or is there a reason to use the 
deprecated one?



##########
server/src/main/java/org/apache/druid/server/ZKPathsUtils.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.curator.utils.ZKPaths;
+
+public class ZKPathsUtils
+{
+  public static String getParentPath(String path)
+  {
+    return ZKPaths.getPathAndNode(path).getPath();
+  }
+
+  public static String getParentNode(String path)
+  {
+    return ZKPaths.getPathAndNode(path).getNode();
+  }

Review Comment:
   not sure about the value of these methods...they could be inlined - so that 
the ZK api is used directly



##########
server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.test.KillSession;
+import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;

Review Comment:
   please don't use junit4 for newly added tests



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);

Review Comment:
   I don't think there is an upper limit on the number of `cache`-s ; could 
that cause any problems?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to