GWphua commented on code in PR #17482:
URL: https://github.com/apache/druid/pull/17482#discussion_r1998324342


##########
server/src/main/java/org/apache/druid/curator/announcement/Announcer.java:
##########
@@ -212,13 +234,13 @@ public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
         }
       }
       catch (Exception e) {
-        log.debug(e, "Problem checking if the parent existed, ignoring.");
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
       }
 
       // I don't have a watcher on this path yet, create a Map and start 
watching.
       announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>());
 
-      // Guaranteed to be non-null, but might be a map put in there by another 
thread.
+      // Guaranteed to be non-null, but might be a map put in here by another 
thread.

Review Comment:
   refactored.



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }
+
+    final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, 
bytes);
+
+    if (readyToCreateAnnouncement) {
+      try {
+        createAnnouncement(path, bytes);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private NodeCache setupNodeCache(String path)
+  {
+    final NodeCache cache = new NodeCache(curator, path, true);

Review Comment:
   Replaced with the more recent CuratorCache implementation.
   
   I have tried to update PathChildrenCache to CuratorCache too, but I have 
problem passing the integration tests when doing so. Since this PR focuses on 
NodeAnnouncer, I feel we can leave this for later.



##########
server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.test.KillSession;
+import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;

Review Comment:
   Updated with JUnit5



##########
server/src/main/java/org/apache/druid/server/ZKPathsUtils.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.curator.utils.ZKPaths;
+
+public class ZKPathsUtils
+{
+  public static String getParentPath(String path)
+  {
+    return ZKPaths.getPathAndNode(path).getPath();
+  }
+
+  public static String getParentNode(String path)
+  {
+    return ZKPaths.getPathAndNode(path).getNode();
+  }

Review Comment:
   Removed `ZKPathsUtil`



##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a 
single
+ * node, along with all of its parent's status. See {@link Announcer} for an 
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+  private static final Logger log = new Logger(NodeAnnouncer.class);
+
+  private final CuratorFramework curator;
+  private final ExecutorService nodeCacheExecutor;
+
+  private final ConcurrentMap<String, NodeCache> listeners = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, byte[]> announcedPaths = new 
ConcurrentHashMap<>();
+
+  @GuardedBy("toAnnounce")
+  private boolean started = false;
+
+  /**
+   * This list holds paths that need to be announced. If a path is added to 
this list
+   * in the {@link #announce(String, byte[], boolean)} method before the 
connection to ZooKeeper is established,
+   * it will be stored here and announced later during the {@link #start} 
method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toAnnounce = new ArrayList<>();
+
+  /**
+   * This list holds paths that need to be updated. If a path is added to this 
list
+   * in the {@link #update} method before the connection to ZooKeeper is 
established,
+   * it will be stored here and updated later during the {@link #start} method.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<Announceable> toUpdate = new ArrayList<>();
+
+  /**
+   * This list keeps track of all the paths created by this node announcer.
+   * When the {@link #stop} method is called,
+   * the node announcer is responsible for deleting all paths stored in this 
list.
+   */
+  @GuardedBy("toAnnounce")
+  private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+  public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+  {
+    this.curator = curator;
+    this.nodeCacheExecutor = exec;
+  }
+
+  @VisibleForTesting
+  Set<String> getAddedPaths()
+  {
+    return announcedPaths.keySet();
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.debug("Starting NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (started) {
+        log.debug("Cannot start NodeAnnouncer that has already started.");
+        return;
+      }
+
+      started = true;
+
+      for (Announceable announceable : toAnnounce) {
+        announce(announceable.path, announceable.bytes, 
announceable.removeParentsIfCreated);
+      }
+      toAnnounce.clear();
+
+      for (Announceable announceable : toUpdate) {
+        update(announceable.path, announceable.bytes);
+      }
+      toUpdate.clear();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.debug("Stopping NodeAnnouncer");
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("Cannot stop NodeAnnouncer that has not started.");
+        return;
+      }
+
+      started = false;
+      closeResources();
+    }
+  }
+
+  @GuardedBy("toAnnounce")
+  private void closeResources()
+  {
+    try {
+      // Close all caches...
+      CloseableUtils.closeAll(listeners.values());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      nodeCacheExecutor.shutdown();
+    }
+
+    for (String announcementPath : announcedPaths.keySet()) {
+      unannounce(announcementPath);
+    }
+
+    if (!parentsIBuilt.isEmpty()) {
+      CuratorMultiTransaction transaction = curator.transaction();
+
+      ArrayList<CuratorOp> operations = new ArrayList<>();
+      for (String parent : parentsIBuilt) {
+        try {
+          operations.add(curator.transactionOp().delete().forPath(parent));
+        }
+        catch (Exception e) {
+          log.info(e, "Unable to delete parent[%s] when closing 
NodeAnnouncer.", parent);
+        }
+      }
+
+      try {
+        transaction.forOperations(operations);
+      }
+      catch (Exception e) {
+        log.info(e, "Unable to commit transaction when closing 
NodeAnnouncer.");
+      }
+    }
+  }
+
+  /**
+   * Overload of {@link #announce(String, byte[], boolean)}, but removes 
parent node of path after announcement.
+   */
+  public void announce(String path, byte[] bytes)
+  {
+    announce(path, bytes, true);
+  }
+
+  /**
+   * Announces the provided bytes at the given path.
+   *
+   * <p>
+   * Announcement using {@link NodeAnnouncer} will create an ephemeral znode 
at the specified path, and listens for
+   * changes on your znode. Your znode will exist until it is unannounced, or 
until {@link #stop()} is called.
+   * </p>
+   *
+   * @param path                  The path to announce at
+   * @param bytes                 The payload to announce
+   * @param removeParentIfCreated remove parent of "path" if we had created 
that parent during announcement
+   */
+  public void announce(String path, byte[] bytes, boolean 
removeParentIfCreated)
+  {
+    synchronized (toAnnounce) {
+      if (!started) {
+        log.debug("NodeAnnouncer has not started yet, queuing announcement for 
later processing...");
+        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+        return;
+      }
+    }
+
+    final String parentPath = ZKPathsUtils.getParentPath(path);
+    byte[] announcedPayload = announcedPaths.get(path);
+
+    // If announcedPayload is null, this means that we have yet to announce 
this path.
+    // There is a possibility that the parent paths do not exist, so we check 
if we need to create the parent path first.
+    if (announcedPayload == null) {
+      boolean buildParentPath = false;
+      try {
+        buildParentPath = curator.checkExists().forPath(parentPath) == null;
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to check existence of parent path. Proceeding 
without creating parent path.");
+      }
+
+      // Synchronize to make sure that I only create a listener once.
+      synchronized (toAnnounce) {
+        if (!listeners.containsKey(path)) {
+          final NodeCache cache = setupNodeCache(path);
+
+          if (started) {
+            if (buildParentPath) {
+              createPath(parentPath, removeParentIfCreated);
+            }
+            startCache(cache);
+            listeners.put(path, cache);
+          }
+        }
+      }
+    }

Review Comment:
   I referred to #195 when writing this. Seems like the exception does not 
affect the functionality, so I retained the behaviour from `Announcer`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to