GWphua commented on code in PR #17482:
URL: https://github.com/apache/druid/pull/17482#discussion_r1998324640


##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, 
bytes);
+
+    if (readyToCreateAnnouncement) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private NodeCache setupNodeCache(String path)
+  {
+    final NodeCache cache = new NodeCache(curator, path, true);
+    cache.getListenable().addListener(
+        () -> nodeCacheExecutor.submit(() -> {
+          ChildData currentData = cache.getCurrentData();
+
+          if (currentData == null) {
+            // If currentData is null, and we know we have already announced 
the data,
+            // this means that the ephemeral node was unexpectedly removed.
+            // We will recreate the node again using the previous data.
+            final byte[] previouslyAnnouncedData = announcedPaths.get(path);
+            if (previouslyAnnouncedData != null) {
+              log.info("Node[%s] dropped, reinstating.", path);
+              try {
+                createAnnouncement(path, previouslyAnnouncedData);
+              }
+              catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        })
+    );
+    return cache;
+  }
+
+  private boolean updateAnnouncedPaths(String path, byte[] bytes)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        return false; // Do nothing if not started

Review Comment:
   Made necessary changes.



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);

Review Comment:
   I don't think we will reach (2^32) - 1 elements. Besides, this PR aims to 
reduce memory consumption, so we should be safe.



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, 
bytes);
+
+    if (readyToCreateAnnouncement) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private NodeCache setupNodeCache(String path)
+  {
+    final NodeCache cache = new NodeCache(curator, path, true);
+    cache.getListenable().addListener(
+        () -> nodeCacheExecutor.submit(() -> {
+          ChildData currentData = cache.getCurrentData();
+
+          if (currentData == null) {
+            // If currentData is null, and we know we have already announced 
the data,
+            // this means that the ephemeral node was unexpectedly removed.
+            // We will recreate the node again using the previous data.
+            final byte[] previouslyAnnouncedData = announcedPaths.get(path);
+            if (previouslyAnnouncedData != null) {
+              log.info("Node[%s] dropped, reinstating.", path);
+              try {
+                createAnnouncement(path, previouslyAnnouncedData);
+              }
+              catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        })
+    );
+    return cache;
+  }
+
+  private boolean updateAnnouncedPaths(String path, byte[] bytes)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        return false; // Do nothing if not started
+      }
+    }
+
+    final byte[] updatedAnnouncementData = announcedPaths.compute(path, (key, 
oldBytes) -> {
+      if (oldBytes == null) {
+        return bytes; // Insert the new value
+      } else if (!Arrays.equals(oldBytes, bytes)) {
+        throw new IAE("Cannot reannounce different values under the same 
path.");
+      }
+      return oldBytes; // No change if values are equal
+    });
+
+    // Return true if we have updated the paths.
+    return Arrays.equals(updatedAnnouncementData, bytes);

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to