GWphua commented on code in PR #17482:
URL: https://github.com/apache/druid/pull/17482#discussion_r1998324342
##########
server/src/main/java/org/apache/druid/curator/announcement/Announcer.java:
##########
@@ -212,13 +234,13 @@ public void announce(String path, byte[] bytes, boolean
removeParentIfCreated)
}
}
catch (Exception e) {
- log.debug(e, "Problem checking if the parent existed, ignoring.");
+ log.warn(e, "Failed to check existence of parent path. Proceeding
without creating parent path.");
}
// I don't have a watcher on this path yet, create a Map and start
watching.
announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>());
- // Guaranteed to be non-null, but might be a map put in there by another
thread.
+ // Guaranteed to be non-null, but might be a map put in here by another
thread.
Review Comment:
refactored.
##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a
single
+ * node, along with all of its parent's status. See {@link Announcer} for an
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+ private static final Logger log = new Logger(NodeAnnouncer.class);
+
+ private final CuratorFramework curator;
+ private final ExecutorService nodeCacheExecutor;
+
+ private final ConcurrentMap<String, NodeCache> listeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, byte[]> announcedPaths = new
ConcurrentHashMap<>();
+
+ @GuardedBy("toAnnounce")
+ private boolean started = false;
+
+ /**
+ * This list holds paths that need to be announced. If a path is added to
this list
+ * in the {@link #announce(String, byte[], boolean)} method before the
connection to ZooKeeper is established,
+ * it will be stored here and announced later during the {@link #start}
method.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<Announceable> toAnnounce = new ArrayList<>();
+
+ /**
+ * This list holds paths that need to be updated. If a path is added to this
list
+ * in the {@link #update} method before the connection to ZooKeeper is
established,
+ * it will be stored here and updated later during the {@link #start} method.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<Announceable> toUpdate = new ArrayList<>();
+
+ /**
+ * This list keeps track of all the paths created by this node announcer.
+ * When the {@link #stop} method is called,
+ * the node announcer is responsible for deleting all paths stored in this
list.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+ public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+ {
+ this.curator = curator;
+ this.nodeCacheExecutor = exec;
+ }
+
+ @VisibleForTesting
+ Set<String> getAddedPaths()
+ {
+ return announcedPaths.keySet();
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ log.debug("Starting NodeAnnouncer");
+ synchronized (toAnnounce) {
+ if (started) {
+ log.debug("Cannot start NodeAnnouncer that has already started.");
+ return;
+ }
+
+ started = true;
+
+ for (Announceable announceable : toAnnounce) {
+ announce(announceable.path, announceable.bytes,
announceable.removeParentsIfCreated);
+ }
+ toAnnounce.clear();
+
+ for (Announceable announceable : toUpdate) {
+ update(announceable.path, announceable.bytes);
+ }
+ toUpdate.clear();
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ log.debug("Stopping NodeAnnouncer");
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("Cannot stop NodeAnnouncer that has not started.");
+ return;
+ }
+
+ started = false;
+ closeResources();
+ }
+ }
+
+ @GuardedBy("toAnnounce")
+ private void closeResources()
+ {
+ try {
+ // Close all caches...
+ CloseableUtils.closeAll(listeners.values());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ nodeCacheExecutor.shutdown();
+ }
+
+ for (String announcementPath : announcedPaths.keySet()) {
+ unannounce(announcementPath);
+ }
+
+ if (!parentsIBuilt.isEmpty()) {
+ CuratorMultiTransaction transaction = curator.transaction();
+
+ ArrayList<CuratorOp> operations = new ArrayList<>();
+ for (String parent : parentsIBuilt) {
+ try {
+ operations.add(curator.transactionOp().delete().forPath(parent));
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to delete parent[%s] when closing
NodeAnnouncer.", parent);
+ }
+ }
+
+ try {
+ transaction.forOperations(operations);
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to commit transaction when closing
NodeAnnouncer.");
+ }
+ }
+ }
+
+ /**
+ * Overload of {@link #announce(String, byte[], boolean)}, but removes
parent node of path after announcement.
+ */
+ public void announce(String path, byte[] bytes)
+ {
+ announce(path, bytes, true);
+ }
+
+ /**
+ * Announces the provided bytes at the given path.
+ *
+ * <p>
+ * Announcement using {@link NodeAnnouncer} will create an ephemeral znode
at the specified path, and listens for
+ * changes on your znode. Your znode will exist until it is unannounced, or
until {@link #stop()} is called.
+ * </p>
+ *
+ * @param path The path to announce at
+ * @param bytes The payload to announce
+ * @param removeParentIfCreated remove parent of "path" if we had created
that parent during announcement
+ */
+ public void announce(String path, byte[] bytes, boolean
removeParentIfCreated)
+ {
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("NodeAnnouncer has not started yet, queuing announcement for
later processing...");
+ toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+ return;
+ }
+ }
+
+ final String parentPath = ZKPathsUtils.getParentPath(path);
+ byte[] announcedPayload = announcedPaths.get(path);
+
+ // If announcedPayload is null, this means that we have yet to announce
this path.
+ // There is a possibility that the parent paths do not exist, so we check
if we need to create the parent path first.
+ if (announcedPayload == null) {
+ boolean buildParentPath = false;
+ try {
+ buildParentPath = curator.checkExists().forPath(parentPath) == null;
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to check existence of parent path. Proceeding
without creating parent path.");
+ }
+
+ // Synchronize to make sure that I only create a listener once.
+ synchronized (toAnnounce) {
+ if (!listeners.containsKey(path)) {
+ final NodeCache cache = setupNodeCache(path);
+
+ if (started) {
+ if (buildParentPath) {
+ createPath(parentPath, removeParentIfCreated);
+ }
+ startCache(cache);
+ listeners.put(path, cache);
+ }
+ }
+ }
+ }
+
+ final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path,
bytes);
+
+ if (readyToCreateAnnouncement) {
+ try {
+ createAnnouncement(path, bytes);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @GuardedBy("toAnnounce")
+ private NodeCache setupNodeCache(String path)
+ {
+ final NodeCache cache = new NodeCache(curator, path, true);
Review Comment:
Replaced with the more recent CuratorCache implementation.
I have tried to update PathChildrenCache to CuratorCache too, but I have
problem passing the integration tests when doing so. Since this PR focuses on
NodeAnnouncer, I feel we can leave this for later.
##########
server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.test.KillSession;
+import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
Review Comment:
Updated with JUnit5
##########
server/src/main/java/org/apache/druid/server/ZKPathsUtils.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.curator.utils.ZKPaths;
+
+public class ZKPathsUtils
+{
+ public static String getParentPath(String path)
+ {
+ return ZKPaths.getPathAndNode(path).getPath();
+ }
+
+ public static String getParentNode(String path)
+ {
+ return ZKPaths.getPathAndNode(path).getNode();
+ }
Review Comment:
Removed `ZKPathsUtil`
##########
server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.ZKPathsUtils;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a
single
+ * node, along with all of its parent's status. See {@link Announcer} for an
announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer
+{
+ private static final Logger log = new Logger(NodeAnnouncer.class);
+
+ private final CuratorFramework curator;
+ private final ExecutorService nodeCacheExecutor;
+
+ private final ConcurrentMap<String, NodeCache> listeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, byte[]> announcedPaths = new
ConcurrentHashMap<>();
+
+ @GuardedBy("toAnnounce")
+ private boolean started = false;
+
+ /**
+ * This list holds paths that need to be announced. If a path is added to
this list
+ * in the {@link #announce(String, byte[], boolean)} method before the
connection to ZooKeeper is established,
+ * it will be stored here and announced later during the {@link #start}
method.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<Announceable> toAnnounce = new ArrayList<>();
+
+ /**
+ * This list holds paths that need to be updated. If a path is added to this
list
+ * in the {@link #update} method before the connection to ZooKeeper is
established,
+ * it will be stored here and updated later during the {@link #start} method.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<Announceable> toUpdate = new ArrayList<>();
+
+ /**
+ * This list keeps track of all the paths created by this node announcer.
+ * When the {@link #stop} method is called,
+ * the node announcer is responsible for deleting all paths stored in this
list.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+ public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+ {
+ this.curator = curator;
+ this.nodeCacheExecutor = exec;
+ }
+
+ @VisibleForTesting
+ Set<String> getAddedPaths()
+ {
+ return announcedPaths.keySet();
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ log.debug("Starting NodeAnnouncer");
+ synchronized (toAnnounce) {
+ if (started) {
+ log.debug("Cannot start NodeAnnouncer that has already started.");
+ return;
+ }
+
+ started = true;
+
+ for (Announceable announceable : toAnnounce) {
+ announce(announceable.path, announceable.bytes,
announceable.removeParentsIfCreated);
+ }
+ toAnnounce.clear();
+
+ for (Announceable announceable : toUpdate) {
+ update(announceable.path, announceable.bytes);
+ }
+ toUpdate.clear();
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ log.debug("Stopping NodeAnnouncer");
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("Cannot stop NodeAnnouncer that has not started.");
+ return;
+ }
+
+ started = false;
+ closeResources();
+ }
+ }
+
+ @GuardedBy("toAnnounce")
+ private void closeResources()
+ {
+ try {
+ // Close all caches...
+ CloseableUtils.closeAll(listeners.values());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ nodeCacheExecutor.shutdown();
+ }
+
+ for (String announcementPath : announcedPaths.keySet()) {
+ unannounce(announcementPath);
+ }
+
+ if (!parentsIBuilt.isEmpty()) {
+ CuratorMultiTransaction transaction = curator.transaction();
+
+ ArrayList<CuratorOp> operations = new ArrayList<>();
+ for (String parent : parentsIBuilt) {
+ try {
+ operations.add(curator.transactionOp().delete().forPath(parent));
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to delete parent[%s] when closing
NodeAnnouncer.", parent);
+ }
+ }
+
+ try {
+ transaction.forOperations(operations);
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to commit transaction when closing
NodeAnnouncer.");
+ }
+ }
+ }
+
+ /**
+ * Overload of {@link #announce(String, byte[], boolean)}, but removes
parent node of path after announcement.
+ */
+ public void announce(String path, byte[] bytes)
+ {
+ announce(path, bytes, true);
+ }
+
+ /**
+ * Announces the provided bytes at the given path.
+ *
+ * <p>
+ * Announcement using {@link NodeAnnouncer} will create an ephemeral znode
at the specified path, and listens for
+ * changes on your znode. Your znode will exist until it is unannounced, or
until {@link #stop()} is called.
+ * </p>
+ *
+ * @param path The path to announce at
+ * @param bytes The payload to announce
+ * @param removeParentIfCreated remove parent of "path" if we had created
that parent during announcement
+ */
+ public void announce(String path, byte[] bytes, boolean
removeParentIfCreated)
+ {
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("NodeAnnouncer has not started yet, queuing announcement for
later processing...");
+ toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+ return;
+ }
+ }
+
+ final String parentPath = ZKPathsUtils.getParentPath(path);
+ byte[] announcedPayload = announcedPaths.get(path);
+
+ // If announcedPayload is null, this means that we have yet to announce
this path.
+ // There is a possibility that the parent paths do not exist, so we check
if we need to create the parent path first.
+ if (announcedPayload == null) {
+ boolean buildParentPath = false;
+ try {
+ buildParentPath = curator.checkExists().forPath(parentPath) == null;
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to check existence of parent path. Proceeding
without creating parent path.");
+ }
+
+ // Synchronize to make sure that I only create a listener once.
+ synchronized (toAnnounce) {
+ if (!listeners.containsKey(path)) {
+ final NodeCache cache = setupNodeCache(path);
+
+ if (started) {
+ if (buildParentPath) {
+ createPath(parentPath, removeParentIfCreated);
+ }
+ startCache(cache);
+ listeners.put(path, cache);
+ }
+ }
+ }
+ }
Review Comment:
I referred to #195 when writing this. Seems like the exception does not
affect the functionality, so I retained the behaviour from `Announcer`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]