This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 228304eb058 Fix Huge Number of Watches in ZooKeeper (#17482)
228304eb058 is described below
commit 228304eb058df54ec22a4de028a62eeda92f7f74
Author: Virushade <[email protected]>
AuthorDate: Thu May 8 14:52:13 2025 +0800
Fix Huge Number of Watches in ZooKeeper (#17482)
---------
Co-authored-by: asdf2014 <[email protected]>
---
docs/api-reference/tasks-api.md | 4 +-
docs/configuration/index.md | 1 +
.../indexing/worker/WorkerCuratorCoordinator.java | 12 +-
.../indexing/worker/WorkerTaskMonitorTest.java | 3 +
.../indexing/worker/http/WorkerResourceTest.java | 3 +
.../org/apache/druid/curator/CuratorConfig.java | 8 +
.../druid/curator/announcement/Announceable.java | 56 +++
.../druid/curator/announcement/NodeAnnouncer.java | 404 +++++++++++++++++++++
.../{Announcer.java => PathChildrenAnnouncer.java} | 109 +++---
.../curator/announcement/ServiceAnnouncer.java | 35 ++
.../discovery/CuratorDruidNodeAnnouncer.java | 11 +-
.../org/apache/druid/guice/AnnouncerModule.java | 30 +-
.../guice/annotations/DirectExecutorAnnouncer.java | 34 ++
.../guice/annotations/SingleThreadedAnnouncer.java | 34 ++
.../coordination/BatchDataSegmentAnnouncer.java | 9 +-
.../CuratorDataSegmentServerAnnouncer.java | 7 +-
.../client/BatchServerInventoryViewTest.java | 24 +-
.../curator/announcement/NodeAnnouncerTest.java | 367 +++++++++++++++++++
...cerTest.java => PathChildrenAnnouncerTest.java} | 95 ++---
.../CuratorDruidNodeAnnouncerAndDiscoveryTest.java | 7 +-
.../BatchDataSegmentAnnouncerTest.java | 21 +-
website/.spelling | 2 +
22 files changed, 1125 insertions(+), 151 deletions(-)
diff --git a/docs/api-reference/tasks-api.md b/docs/api-reference/tasks-api.md
index d94be5b0c5f..69a0a015361 100644
--- a/docs/api-reference/tasks-api.md
+++ b/docs/api-reference/tasks-api.md
@@ -1059,9 +1059,9 @@ Host: http://ROUTER_IP:ROUTER_PORT
2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Closing reporter
org.apache.kafka.common.metrics.JmxReporter
2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
2023-07-03T22:11:17,935 INFO [task-runner-0-priority-0]
org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for
consumer-kafka-supervisor-dcanhmig-1 unregistered
- 2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0]
org.apache.druid.curator.announcement.Announcer - Unannouncing
[/druid/internal-discovery/PEON/localhost:8100]
+ 2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0]
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing
[/druid/internal-discovery/PEON/localhost:8100]
2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0]
org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self
[{"druidNode":{"service":"druid/middleManager","host":"localhost","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","serverType":"indexer-executor","prior
[...]
- 2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0]
org.apache.druid.curator.announcement.Announcer - Unannouncing
[/druid/announcements/localhost:8100]
+ 2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0]
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing
[/druid/announcements/localhost:8100]
2023-07-03T22:11:17,996 INFO [task-runner-0-priority-0]
org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed
with status: {
"id" : "index_kafka_social_media_0e905aa31037879_nommnaeg",
"status" : "SUCCESS",
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c7ddc73efa0..1a7a1e1516e 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -148,6 +148,7 @@ We recommend just setting the base ZK path and the ZK
service host, but all ZK p
|`druid.zk.service.connectionTimeoutMs`|ZooKeeper connection timeout, in
milliseconds.|`15000`|
|`druid.zk.service.compress`|Boolean flag for whether or not created Znodes
should be compressed.|`true`|
|`druid.zk.service.acl`|Boolean flag for whether or not to enable ACL security
for ZooKeeper. If ACL is enabled, zNode creators will have all
permissions.|`false`|
+|`druid.zk.service.pathChildrenCacheStrategy`|Dictates the underlying caching
strategy for service announcements. Set true to let announcers to use Apache
Curator's PathChildrenCache strategy, otherwise NodeCache strategy. Consider
using NodeCache strategy when you are dealing with huge number of ZooKeeper
watches in your cluster.|`true`|
#### Path configuration
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
index 7e7c09893b9..c018c6a1a63 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java
@@ -25,11 +25,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.CuratorUtils;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.DirectExecutorAnnouncer;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
@@ -54,7 +54,7 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework curatorFramework;
- private final Announcer announcer;
+ private final ServiceAnnouncer announcer;
private final String baseAnnouncementsPath;
private final String baseTaskPath;
@@ -69,6 +69,7 @@ public class WorkerCuratorCoordinator
IndexerZkConfig indexerZkConfig,
RemoteTaskRunnerConfig config,
CuratorFramework curatorFramework,
+ @DirectExecutorAnnouncer ServiceAnnouncer announcer,
Worker worker
)
{
@@ -76,8 +77,7 @@ public class WorkerCuratorCoordinator
this.config = config;
this.curatorFramework = curatorFramework;
this.worker = worker;
-
- this.announcer = new Announcer(curatorFramework, Execs.directExecutor());
+ this.announcer = announcer;
this.baseAnnouncementsPath =
getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(),
worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(),
worker.getHost()));
@@ -87,7 +87,7 @@ public class WorkerCuratorCoordinator
@LifecycleStart
public void start() throws Exception
{
- log.info("WorkerCuratorCoordinator good to go sir. Server[%s]",
worker.getHost());
+ log.info("WorkerCuratorCoordinator good to go. Server[%s]",
worker.getHost());
synchronized (lock) {
if (started) {
return;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 6c0462dcf50..79d96fde860 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -28,6 +28,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -47,6 +48,7 @@ import
org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -142,6 +144,7 @@ public class WorkerTaskMonitorTest
),
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
cf,
+ new NodeAnnouncer(cf, Execs.directExecutor()),
worker
);
workerCuratorCoordinator.start();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
index 0ad900dd8c4..df8e34ab111 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java
@@ -26,6 +26,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
@@ -33,6 +34,7 @@ import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.easymock.EasyMock;
@@ -95,6 +97,7 @@ public class WorkerResourceTest
}, null, null, null, null),
new RemoteTaskRunnerConfig(),
cf,
+ new NodeAnnouncer(cf, Execs.directExecutor()),
worker
);
curatorCoordinator.start();
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
index 7a53ee941d7..e98a457b0f3 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
@@ -63,6 +63,9 @@ public class CuratorConfig
@JsonProperty("maxZkRetries")
private int maxZkRetries = 29;
+ @JsonProperty("pathChildrenCacheStrategy")
+ private boolean pathChildrenCacheStrategy = true;
+
public static CuratorConfig create(String hosts)
{
CuratorConfig config = new CuratorConfig();
@@ -141,4 +144,9 @@ public class CuratorConfig
{
return maxZkRetries;
}
+
+ public boolean getPathChildrenCacheStrategy()
+ {
+ return pathChildrenCacheStrategy;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java
b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java
new file mode 100644
index 00000000000..cecdb8dbcd5
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+/**
+ * The {@link Announceable} is a representation of an announcement to be made
in ZooKeeper.
+ */
+class Announceable
+{
+ /**
+ * Represents the path in ZooKeeper where the announcement will be made.
+ */
+ final String path;
+
+ /**
+ * Holds the actual data to be announced.
+ */
+ final byte[] bytes;
+
+ /**
+ * Indicates whether parent nodes should be removed if the announcement is
created successfully.
+ * This can be useful for cleaning up unused paths in ZooKeeper.
+ */
+ final boolean removeParentsIfCreated;
+
+ public Announceable(String path, byte[] bytes, boolean
removeParentsIfCreated)
+ {
+ this.path = path;
+ this.bytes = bytes;
+ this.removeParentsIfCreated = removeParentsIfCreated;
+ }
+
+ // This should be used for updates only, where removeParentsIfCreated is not
relevant.
+ public Announceable(String path, byte[] bytes)
+ {
+ // removeParentsIfCreated is irrelevant, so we can use dummy value "false".
+ this(path, bytes, false);
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java
b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java
new file mode 100644
index 00000000000..d38bf79fa38
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The {@link NodeAnnouncer} class is responsible for announcing a single node
+ * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path
+ * and monitors its existence to ensure that it remains active until it is
+ * explicitly unannounced or the object is closed.
+ *
+ * <p>
+ * This class uses Apache Curator's NodeCache recipe under the hood to track a
single
+ * node, along with all of its parent's status. See {@link
PathChildrenAnnouncer} for an announcer that
+ * uses the PathChildrenCache recipe instead.
+ * </p>
+ */
+public class NodeAnnouncer implements ServiceAnnouncer
+{
+ private static final Logger log = new Logger(NodeAnnouncer.class);
+
+ private final CuratorFramework curator;
+ private final ExecutorService nodeCacheExecutor;
+
+ private final ConcurrentHashMap<String, CuratorCache> listeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, byte[]> announcedPaths = new
ConcurrentHashMap<>();
+
+ @GuardedBy("toAnnounce")
+ private boolean started = false;
+
+ /**
+ * This list holds paths that need to be announced. If a path is added to
this list
+ * in the {@link #announce(String, byte[], boolean)} method before the
connection to ZooKeeper is established,
+ * it will be stored here and announced later during the {@link #start}
method.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<Announceable> toAnnounce = new ArrayList<>();
+
+ /**
+ * This list holds paths that need to be updated. If a path is added to this
list
+ * in the {@link #update} method before the connection to ZooKeeper is
established,
+ * it will be stored here and updated later during the {@link #start} method.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<Announceable> toUpdate = new ArrayList<>();
+
+ /**
+ * This list keeps track of all the paths created by this node announcer.
+ * When the {@link #stop} method is called,
+ * the node announcer is responsible for deleting all paths stored in this
list.
+ */
+ @GuardedBy("toAnnounce")
+ private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
+
+ public NodeAnnouncer(CuratorFramework curator, ExecutorService exec)
+ {
+ this.curator = curator;
+ this.nodeCacheExecutor = exec;
+ }
+
+ @VisibleForTesting
+ Set<String> getAddedPaths()
+ {
+ return announcedPaths.keySet();
+ }
+
+ @LifecycleStart
+ @Override
+ public void start()
+ {
+ log.debug("Starting Announcer");
+ synchronized (toAnnounce) {
+ if (started) {
+ log.debug("Announcer has already been started by another thread,
ignoring start request.");
+ return;
+ }
+
+ started = true;
+
+ for (Announceable announceable : toAnnounce) {
+ announce(announceable.path, announceable.bytes,
announceable.removeParentsIfCreated);
+ }
+ toAnnounce.clear();
+
+ for (Announceable announceable : toUpdate) {
+ update(announceable.path, announceable.bytes);
+ }
+ toUpdate.clear();
+ }
+ }
+
+ @LifecycleStop
+ @Override
+ public void stop()
+ {
+ log.debug("Stopping Announcer");
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("Announcer has already been stopped by another thread,
ignoring stop request.");
+ return;
+ }
+
+ started = false;
+ closeResources();
+ }
+ }
+
+ @GuardedBy("toAnnounce")
+ private void closeResources()
+ {
+ try {
+ // Close all caches...
+ CloseableUtils.closeAll(listeners.values());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ nodeCacheExecutor.shutdown();
+ }
+
+ for (String announcementPath : announcedPaths.keySet()) {
+ unannounce(announcementPath);
+ }
+
+ if (!parentsIBuilt.isEmpty()) {
+ CuratorMultiTransaction transaction = curator.transaction();
+
+ ArrayList<CuratorOp> operations = new ArrayList<>();
+ for (String parent : parentsIBuilt) {
+ try {
+ operations.add(curator.transactionOp().delete().forPath(parent));
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to delete parent[%s] when closing Announcer.",
parent);
+ }
+ }
+
+ try {
+ transaction.forOperations(operations);
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to commit transaction when closing Announcer.");
+ }
+ }
+ }
+
+ /**
+ * Overload of {@link #announce(String, byte[], boolean)}, but removes
parent node of path after announcement.
+ */
+ @Override
+ public void announce(String path, byte[] bytes)
+ {
+ announce(path, bytes, true);
+ }
+
+ /**
+ * Announces the provided bytes at the given path.
+ *
+ * <p>
+ * Announcement using {@link NodeAnnouncer} will create an ephemeral znode
at the specified path, and listens for
+ * changes on your znode. Your znode will exist until it is unannounced, or
until {@link #stop()} is called.
+ * </p>
+ *
+ * @param path The path to announce at
+ * @param bytes The payload to announce
+ * @param removeParentIfCreated remove parent of "path" if we had created
that parent during announcement
+ */
+ @Override
+ public void announce(String path, byte[] bytes, boolean
removeParentIfCreated)
+ {
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("Announcer has not started yet, queuing announcement for
later processing...");
+ toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
+ return;
+ }
+ }
+
+ final String parentPath = ZKPaths.getPathAndNode(path).getPath();
+ byte[] announcedPayload = announcedPaths.get(path);
+
+ // If announcedPayload is null, this means that we have yet to announce
this path.
+ // There is a possibility that the parent paths do not exist, so we check
if we need to create the parent path first.
+ if (announcedPayload == null) {
+ boolean buildParentPath = false;
+ try {
+ buildParentPath = curator.checkExists().forPath(parentPath) == null;
+ }
+ catch (Exception e) {
+ log.debug(e, "Problem checking if the parent existed, ignoring.");
+ }
+
+ // Synchronize to make sure that I only create a listener once.
+ synchronized (toAnnounce) {
+ if (!listeners.containsKey(path)) {
+ final CuratorCache cache = createCacheForPath(path);
+
+ if (started) {
+ if (buildParentPath) {
+ createPath(parentPath, removeParentIfCreated);
+ }
+ startCache(cache);
+ listeners.put(path, cache);
+ }
+ }
+ }
+ }
+
+ boolean created = false;
+ synchronized (toAnnounce) {
+ if (started) {
+ byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes);
+
+ if (oldBytes == null) {
+ created = true;
+ } else if (!Arrays.equals(oldBytes, bytes)) {
+ throw new IAE("Cannot reannounce different values under the same
path.");
+ }
+ }
+ }
+
+ if (created) {
+ try {
+ createAnnouncement(path, bytes);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @GuardedBy("toAnnounce")
+ private CuratorCache createCacheForPath(String path)
+ {
+ final CuratorCache cache = CuratorCache.build(curator, path,
CuratorCache.Options.SINGLE_NODE_CACHE);
+
+ cache.listenable().addListener(
+ (type, oldData, data) -> {
+ if (type == CuratorCacheListener.Type.NODE_DELETED) {
+ final byte[] previouslyAnnouncedData = announcedPaths.get(path);
+ if (previouslyAnnouncedData != null) {
+ try {
+ log.info("ZooKeeper Node[%s] dropped, reinstating...", path);
+ createAnnouncement(path, previouslyAnnouncedData);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }, nodeCacheExecutor
+ );
+
+ return cache;
+ }
+
+ @Override
+ public void update(final String path, final byte[] bytes)
+ {
+ synchronized (toAnnounce) {
+ if (!started) {
+ log.debug("Announcer has not started yet, queuing updates for later
processing...");
+ toUpdate.add(new Announceable(path, bytes));
+ return;
+ }
+
+ byte[] oldBytes = announcedPaths.get(path);
+
+ if (oldBytes == null) {
+ throw new ISE("Cannot update path[%s] that hasn't been announced!",
path);
+ }
+
+ try {
+ if (!Arrays.equals(oldBytes, bytes)) {
+ announcedPaths.put(path, bytes);
+ updateAnnouncement(path, bytes);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void createAnnouncement(final String path, byte[] value) throws
Exception
+ {
+
curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path,
value);
+ }
+
+ private void updateAnnouncement(final String path, final byte[] value)
throws Exception
+ {
+ curator.setData().compressed().inBackground().forPath(path, value);
+ }
+
+ /**
+ * Unannounces an announcement created at path. Note that if all
announcements get removed, the Announcer
+ * will continue to have ZK watches on paths because clearing them out is a
source of ugly race conditions.
+ * <p/>
+ * If you need to completely clear all the state of what is being watched
and announced, stop() the Announcer.
+ *
+ * @param path the path to unannounce
+ */
+ @Override
+ public void unannounce(String path)
+ {
+ synchronized (toAnnounce) {
+ final byte[] value = announcedPaths.remove(path);
+
+ if (value == null) {
+ log.debug("Path[%s] not announced, cannot unannounce.", path);
+ return;
+ }
+ }
+
+ log.info("unannouncing [%s]", path);
+
+ try {
+ CuratorOp deleteOp = curator.transactionOp().delete().forPath(path);
+ curator.transaction().forOperations(deleteOp);
+ }
+ catch (KeeperException.NoNodeException e) {
+ log.info("Unannounced node[%s] that does not exist.", path);
+ }
+ catch (KeeperException.NotEmptyException e) {
+ log.warn("Unannouncing non-empty path[%s]", path);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void startCache(CuratorCache cache)
+ {
+ try {
+ cache.start();
+ }
+ catch (Throwable e) {
+ throw CloseableUtils.closeAndWrapInCatch(e, cache);
+ }
+ }
+
+ @GuardedBy("toAnnounce")
+ private void createPath(String parentPath, boolean removeParentsIfCreated)
+ {
+ try {
+ curator.create().creatingParentsIfNeeded().forPath(parentPath);
+ if (removeParentsIfCreated) {
+ // We keep track of all parents we have built, so we can delete them
later on when needed.
+ parentsIBuilt.add(parentPath);
+ }
+
+ log.debug("Created parentPath[%s], %s remove on stop.", parentPath,
removeParentsIfCreated ? "will" : "will not");
+ }
+ catch (KeeperException.NodeExistsException e) {
+ log.info(e, "Problem creating parentPath[%s], someone else created it
first?", parentPath);
+ }
+ catch (Exception e) {
+ log.error(e, "Unhandled exception when creating parentPath[%s].",
parentPath);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java
b/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java
similarity index 78%
rename from
server/src/main/java/org/apache/druid/curator/announcement/Announcer.java
rename to
server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java
index 4abd2404524..ec754d96be2 100644
--- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java
+++
b/server/src/main/java/org/apache/druid/curator/announcement/PathChildrenAnnouncer.java
@@ -20,9 +20,10 @@
package org.apache.druid.curator.announcement;
import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -37,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
@@ -53,20 +53,30 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Announces things on Zookeeper.
+ * The {@link PathChildrenAnnouncer} class manages the announcement of a node,
and watches all child
+ * and sibling nodes under the specified path in a ZooKeeper ensemble. It
monitors these nodes
+ * to ensure their existence and manage their lifecycle collectively.
+ *
+ * <p>
+ * This class uses Apache Curator's PathChildrenCache recipe under the hood to
track all znodes
+ * under the specified node's parent. See {@link NodeAnnouncer} for an
announcer that
+ * uses the NodeCache recipe instead.
+ * </p>
*/
-public class Announcer
+public class PathChildrenAnnouncer implements ServiceAnnouncer
{
- private static final Logger log = new Logger(Announcer.class);
+ private static final Logger log = new Logger(PathChildrenAnnouncer.class);
private final CuratorFramework curator;
private final PathChildrenCacheFactory factory;
private final ExecutorService pathChildrenCacheExecutor;
+ @GuardedBy("toAnnounce")
private final List<Announceable> toAnnounce = new ArrayList<>();
+ @GuardedBy("toAnnounce")
private final List<Announceable> toUpdate = new ArrayList<>();
- private final ConcurrentMap<String, PathChildrenCache> listeners = new
ConcurrentHashMap<>();
- private final ConcurrentMap<String, ConcurrentMap<String, byte[]>>
announcements = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, PathChildrenCache> listeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, byte[]>>
announcements = new ConcurrentHashMap<>();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();
// Used for testing
@@ -74,7 +84,7 @@ public class Announcer
private boolean started = false;
- public Announcer(
+ public PathChildrenAnnouncer(
CuratorFramework curator,
ExecutorService exec
)
@@ -102,11 +112,13 @@ public class Announcer
}
@LifecycleStart
+ @Override
public void start()
{
log.debug("Starting Announcer.");
synchronized (toAnnounce) {
if (started) {
+ log.debug("Announcer has already been started by another thread,
ignoring start request.");
return;
}
@@ -125,11 +137,13 @@ public class Announcer
}
@LifecycleStop
+ @Override
public void stop()
{
log.debug("Stopping Announcer.");
synchronized (toAnnounce) {
if (!started) {
+ log.debug("Announcer has already been stopped by another thread,
ignoring stop request.");
return;
}
@@ -145,7 +159,7 @@ public class Announcer
pathChildrenCacheExecutor.shutdown();
}
- for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry :
announcements.entrySet()) {
+ for (Map.Entry<String, ConcurrentHashMap<String, byte[]>> entry :
announcements.entrySet()) {
String basePath = entry.getKey();
for (String announcementPath : entry.getValue().keySet()) {
@@ -154,45 +168,56 @@ public class Announcer
}
if (!parentsIBuilt.isEmpty()) {
- CuratorTransaction transaction = curator.inTransaction();
+ CuratorMultiTransaction transaction = curator.transaction();
+
+ ArrayList<CuratorOp> operations = new ArrayList<>();
for (String parent : parentsIBuilt) {
try {
- transaction = transaction.delete().forPath(parent).and();
+ operations.add(curator.transactionOp().delete().forPath(parent));
}
catch (Exception e) {
- log.info(e, "Unable to delete parent[%s], boooo.", parent);
+ log.info(e, "Unable to delete parent[%s] when closing Announcer.",
parent);
}
}
+
try {
- ((CuratorTransactionFinal) transaction).commit();
+ transaction.forOperations(operations);
}
catch (Exception e) {
- log.info(e, "Unable to commit transaction. Please feed the
hamsters");
+ log.info(e, "Unable to commit transaction when closing Announcer.");
}
}
}
}
/**
- * Like announce(path, bytes, true).
+ * Overload of {@link #announce(String, byte[], boolean)}, but removes
parent node of path after announcement.
*/
+ @Override
public void announce(String path, byte[] bytes)
{
announce(path, bytes, true);
}
/**
- * Announces the provided bytes at the given path. Announcement means that
it will create an ephemeral node
- * and monitor it to make sure that it always exists until it is unannounced
or this object is closed.
+ * Announces the provided bytes at the given path.
+ *
+ * <p>
+ * Announcement using {@link PathChildrenAnnouncer} will create an ephemeral
znode at the specified path, and uses its parent
+ * path to watch all the siblings and children znodes of your specified
path. The watched nodes will always exist
+ * until it is unannounced, or until {@link #stop()} is called.
+ * </p>
*
* @param path The path to announce at
* @param bytes The payload to announce
- * @param removeParentIfCreated remove parent of "path" if we had created
that parent
+ * @param removeParentIfCreated remove parent of "path" if we had created
that parent during announcement
*/
+ @Override
public void announce(String path, byte[] bytes, boolean
removeParentIfCreated)
{
synchronized (toAnnounce) {
if (!started) {
+ log.debug("Announcer has not started yet, queuing announcement for
later processing...");
toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
return;
}
@@ -215,11 +240,7 @@ public class Announcer
log.debug(e, "Problem checking if the parent existed, ignoring.");
}
- // I don't have a watcher on this path yet, create a Map and start
watching.
- announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>());
-
- // Guaranteed to be non-null, but might be a map put in there by another
thread.
- final ConcurrentMap<String, byte[]> finalSubPaths =
announcements.get(parentPath);
+ final ConcurrentHashMap<String, byte[]> finalSubPaths =
announcements.computeIfAbsent(parentPath, key -> new ConcurrentHashMap<>());
// Synchronize to make sure that I only create a listener once.
synchronized (finalSubPaths) {
@@ -329,12 +350,12 @@ public class Announcer
}
}
+ @Override
public void update(final String path, final byte[] bytes)
{
synchronized (toAnnounce) {
if (!started) {
- // removeParentsIfCreated is not relevant for updates; use dummy value
"false".
- toUpdate.add(new Announceable(path, bytes, false));
+ toUpdate.add(new Announceable(path, bytes));
return;
}
}
@@ -347,7 +368,7 @@ public class Announcer
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null || subPaths.get(nodePath) == null) {
- throw new ISE("Cannot update a path[%s] that hasn't been announced!",
path);
+ throw new ISE("Cannot update path[%s] that hasn't been announced!",
path);
}
synchronized (toAnnounce) {
@@ -365,14 +386,14 @@ public class Announcer
}
}
- private String createAnnouncement(final String path, byte[] value) throws
Exception
+ private void createAnnouncement(final String path, byte[] value) throws
Exception
{
- return
curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path,
value);
+
curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path,
value);
}
- private Stat updateAnnouncement(final String path, final byte[] value)
throws Exception
+ private void updateAnnouncement(final String path, final byte[] value)
throws Exception
{
- return curator.setData().compressed().inBackground().forPath(path, value);
+ curator.setData().compressed().inBackground().forPath(path, value);
}
/**
@@ -383,6 +404,7 @@ public class Announcer
*
* @param path the path to unannounce
*/
+ @Override
public void unannounce(String path)
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
@@ -397,10 +419,14 @@ public class Announcer
log.info("Unannouncing [%s]", path);
try {
- curator.inTransaction().delete().forPath(path).and().commit();
+ CuratorOp deleteOp = curator.transactionOp().delete().forPath(path);
+ curator.transaction().forOperations(deleteOp);
}
catch (KeeperException.NoNodeException e) {
- log.info("Node[%s] didn't exist anyway...", path);
+ log.info("Unannounced node[%s] that does not exist.", path);
+ }
+ catch (KeeperException.NotEmptyException e) {
+ log.warn("Unannouncing non-empty path[%s]", path);
}
catch (Exception e) {
throw new RuntimeException(e);
@@ -426,22 +452,11 @@ public class Announcer
}
log.debug("Created parentPath[%s], %s remove on stop.", parentPath,
removeParentsIfCreated ? "will" : "will not");
}
- catch (Exception e) {
+ catch (KeeperException.NodeExistsException e) {
log.info(e, "Problem creating parentPath[%s], someone else created it
first?", parentPath);
}
- }
-
- private static class Announceable
- {
- final String path;
- final byte[] bytes;
- final boolean removeParentsIfCreated;
-
- public Announceable(String path, byte[] bytes, boolean
removeParentsIfCreated)
- {
- this.path = path;
- this.bytes = bytes;
- this.removeParentsIfCreated = removeParentsIfCreated;
+ catch (Exception e) {
+ log.error(e, "Unhandled exception when creating parentPath[%s].",
parentPath);
}
}
}
diff --git
a/server/src/main/java/org/apache/druid/curator/announcement/ServiceAnnouncer.java
b/server/src/main/java/org/apache/druid/curator/announcement/ServiceAnnouncer.java
new file mode 100644
index 00000000000..e079e1340e9
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/curator/announcement/ServiceAnnouncer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+public interface ServiceAnnouncer
+{
+ void start();
+
+ void stop();
+
+ void announce(String path, byte[] bytes);
+
+ void announce(String path, byte[] bytes, boolean removeParentIfCreated);
+
+ void update(String path, byte[] bytes);
+
+ void unannounce(String path);
+}
diff --git
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
index 5b536c65d39..662706578ba 100644
---
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
+++
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
@@ -23,11 +23,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
@@ -42,12 +43,16 @@ public class CuratorDruidNodeAnnouncer implements
DruidNodeAnnouncer
private static final Logger log = new
Logger(CuratorDruidNodeAnnouncer.class);
- private final Announcer announcer;
+ private final ServiceAnnouncer announcer;
private final ZkPathsConfig config;
private final ObjectMapper jsonMapper;
@Inject
- public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config,
@Json ObjectMapper jsonMapper)
+ public CuratorDruidNodeAnnouncer(
+ @SingleThreadedAnnouncer ServiceAnnouncer announcer,
+ ZkPathsConfig config,
+ @Json ObjectMapper jsonMapper
+ )
{
this.announcer = announcer;
this.config = config;
diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
index 6b0c96641a6..b6a4d472283 100644
--- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
+++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java
@@ -24,8 +24,13 @@ import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.druid.curator.CuratorConfig;
import org.apache.druid.curator.ZkEnablementConfig;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
+import org.apache.druid.curator.announcement.PathChildrenAnnouncer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.DirectExecutorAnnouncer;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
@@ -64,9 +69,28 @@ public class AnnouncerModule implements Module
}
@Provides
+ @SingleThreadedAnnouncer
@ManageLifecycleAnnouncements
- public Announcer getAnnouncer(CuratorFramework curator)
+ public ServiceAnnouncer
getAnnouncerWithSingleThreadedExecutorService(CuratorFramework curator,
CuratorConfig config)
{
- return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));
+ boolean usingPathChildrenCacheAnnouncer =
config.getPathChildrenCacheStrategy();
+ if (usingPathChildrenCacheAnnouncer) {
+ return new PathChildrenAnnouncer(curator,
Execs.singleThreaded("Announcer-%s"));
+ } else {
+ return new NodeAnnouncer(curator, Execs.singleThreaded("Announcer-%s"));
+ }
+ }
+
+ @Provides
+ @DirectExecutorAnnouncer
+ @ManageLifecycleAnnouncements
+ public ServiceAnnouncer
getAnnouncerWithDirectExecutorService(CuratorFramework curator, CuratorConfig
config)
+ {
+ boolean usingPathChildrenCacheAnnouncer =
config.getPathChildrenCacheStrategy();
+ if (usingPathChildrenCacheAnnouncer) {
+ return new PathChildrenAnnouncer(curator, Execs.directExecutor());
+ } else {
+ return new NodeAnnouncer(curator, Execs.directExecutor());
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java
b/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java
new file mode 100644
index 00000000000..0d675469222
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/guice/annotations/DirectExecutorAnnouncer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DirectExecutorAnnouncer
+{
+}
diff --git
a/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java
b/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java
new file mode 100644
index 00000000000..8f815301ecf
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/guice/annotations/SingleThreadedAnnouncer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SingleThreadedAnnouncer
+{
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index 7fff124ac81..ad721f4ef7c 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -32,7 +32,8 @@ import com.google.inject.Provider;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.curator.ZkEnablementConfig;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -64,7 +65,7 @@ public class BatchDataSegmentAnnouncer implements
DataSegmentAnnouncer
private final BatchDataSegmentAnnouncerConfig config;
@Nullable //Null if zk is disabled or isSkipSegmentAnnouncementOnZk = true
- private final Announcer announcer;
+ private final ServiceAnnouncer announcer;
private final ObjectMapper jsonMapper;
private final String liveSegmentLocation;
@@ -91,7 +92,7 @@ public class BatchDataSegmentAnnouncer implements
DataSegmentAnnouncer
DruidServerMetadata server,
final BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPaths,
- Provider<Announcer> announcerProvider,
+ @SingleThreadedAnnouncer Provider<ServiceAnnouncer> announcerProvider,
ObjectMapper jsonMapper,
ZkEnablementConfig zkEnablementConfig
)
@@ -127,7 +128,7 @@ public class BatchDataSegmentAnnouncer implements
DataSegmentAnnouncer
DruidServerMetadata server,
final BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPaths,
- Announcer announcer,
+ ServiceAnnouncer announcer,
ObjectMapper jsonMapper
)
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
index 1f456bd3445..ec029ba6c9a 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.ServiceAnnouncer;
+import org.apache.druid.guice.annotations.SingleThreadedAnnouncer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -37,7 +38,7 @@ public class CuratorDataSegmentServerAnnouncer implements
DataSegmentServerAnnou
private final DruidServerMetadata server;
private final ZkPathsConfig config;
- private final Announcer announcer;
+ private final ServiceAnnouncer announcer;
private final ObjectMapper jsonMapper;
private final Object lock = new Object();
@@ -48,7 +49,7 @@ public class CuratorDataSegmentServerAnnouncer implements
DataSegmentServerAnnou
public CuratorDataSegmentServerAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
- Announcer announcer,
+ @SingleThreadedAnnouncer ServiceAnnouncer announcer,
ObjectMapper jsonMapper
)
{
diff --git
a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
index 86ee872dafe..f5ed9ba924e 100644
---
a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
+++
b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
@@ -38,7 +38,7 @@ import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerView;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@@ -88,7 +88,7 @@ public class BatchServerInventoryViewTest
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
- private Announcer announcer;
+ private NodeAnnouncer nodeAnnouncer;
private BatchDataSegmentAnnouncer segmentAnnouncer;
private DataSegmentServerAnnouncer serverAnnouncer;
private Set<DataSegment> testSegments;
@@ -116,11 +116,8 @@ public class BatchServerInventoryViewTest
jsonMapper = TestHelper.makeJsonMapper();
- announcer = new Announcer(
- cf,
- Execs.directExecutor()
- );
- announcer.start();
+ nodeAnnouncer = new NodeAnnouncer(cf, Execs.directExecutor());
+ nodeAnnouncer.start();
DruidServerMetadata serverMetadata = new DruidServerMetadata(
"id",
@@ -144,7 +141,7 @@ public class BatchServerInventoryViewTest
serverAnnouncer = new CuratorDataSegmentServerAnnouncer(
serverMetadata,
zkPathsConfig,
- announcer,
+ nodeAnnouncer,
jsonMapper
);
serverAnnouncer.announce();
@@ -160,7 +157,7 @@ public class BatchServerInventoryViewTest
}
},
zkPathsConfig,
- announcer,
+ nodeAnnouncer,
jsonMapper
);
@@ -225,7 +222,7 @@ public class BatchServerInventoryViewTest
batchServerInventoryView.stop();
filteredBatchServerInventoryView.stop();
serverAnnouncer.unannounce();
- announcer.stop();
+ nodeAnnouncer.stop();
cf.close();
testingCluster.stop();
}
@@ -425,7 +422,10 @@ public class BatchServerInventoryViewTest
public void testSameTimeZnode() throws Exception
{
final int numThreads = INITIAL_SEGMENTS / 10;
- final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads,
"BatchServerInventoryViewTest-%d"));
+ final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Execs.multiThreaded(
+ numThreads,
+ "BatchServerInventoryViewTest-%d"
+ ));
segmentAnnouncer.announceSegments(testSegments);
@@ -474,7 +474,7 @@ public class BatchServerInventoryViewTest
return TEST_BASE_PATH;
}
},
- announcer,
+ nodeAnnouncer,
jsonMapper
);
List<DataSegment> segments = new ArrayList<>();
diff --git
a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
new file mode 100644
index 00000000000..6841f87b314
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.announcement;
+
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+public class NodeAnnouncerTest extends CuratorTestBase
+{
+ private ExecutorService exec;
+
+ @BeforeEach
+ public void setUp() throws Exception
+ {
+ setupServerAndCurator();
+ exec = Execs.singleThreaded("test-node-announcer-sanity-%s");
+ curator.start();
+ curator.blockUntilConnected();
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ tearDownServerAndCurator();
+ }
+
+ @Test
+ @Timeout(60_000)
+ public void testCreateParentPath() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath = "/newParent/testPath";
+ final String parentPath = ZKPaths.getPathAndNode(testPath).getPath();
+
+ announcer.start();
+ Assertions.assertNull(curator.checkExists().forPath(parentPath), "Parent
path should not exist before announcement");
+ announcer.announce(testPath, billy);
+
+ // Wait for the announcement to be processed
+ while (curator.checkExists().forPath(testPath) == null) {
+ Thread.sleep(100);
+ }
+
+ Assertions.assertNotNull(curator.checkExists().forPath(parentPath),
"Parent path should be created");
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath));
+ announcer.stop();
+ }
+
+ @Test
+ @Timeout(60_000)
+ public void testAnnounceSamePathWithDifferentPayloadThrowsIAE() throws
Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final byte[] tilly = StringUtils.toUtf8("tilly");
+ final String testPath = "/testPath";
+
+ announcer.start();
+ announcer.announce(testPath, billy);
+ while (curator.checkExists().forPath(testPath) == null) {
+ Thread.sleep(100);
+ }
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath));
+
+ // Nothing wrong when we announce same payload on the same path.
+ announcer.announce(testPath, billy);
+
+ // Expect an exception when announcing a different payload
+ IAE exception = Assertions.assertThrows(IAE.class, () ->
announcer.announce(testPath, tilly));
+ Assertions.assertEquals("Cannot reannounce different values under the same
path.", exception.getMessage());
+
+ // Confirm that the announcement remains unchanged.
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath));
+ announcer.stop();
+ }
+
+ @Test
+ public void testUpdateBeforeStartingNodeAnnouncer() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final byte[] tilly = StringUtils.toUtf8("tilly");
+ final String testPath = "/testAnnounce";
+
+ // Queue update before the announcer is started
+ announcer.update(testPath, tilly);
+ announcer.announce(testPath, billy);
+ announcer.start();
+
+ // Verify that the update took precedence
+ Assertions.assertArrayEquals(tilly,
curator.getData().decompressed().forPath(testPath));
+ announcer.stop();
+ }
+
+ @Test
+ public void testUpdateSuccessfully() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final byte[] tilly = StringUtils.toUtf8("tilly");
+ final String testPath = "/testUpdate";
+
+ announcer.start();
+ announcer.announce(testPath, billy);
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath));
+
+ // Update with the same payload: nothing should change.
+ announcer.update(testPath, billy);
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath));
+
+ // Update with a new payload.
+ announcer.update(testPath, tilly);
+ Assertions.assertArrayEquals(tilly,
curator.getData().decompressed().forPath(testPath));
+ announcer.stop();
+ }
+
+ @Test
+ public void testUpdateNonExistentPath()
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath = "/testUpdate";
+
+ announcer.start();
+
+ ISE exception = Assertions.assertThrows(ISE.class, () ->
announcer.update(testPath, billy));
+ Assertions.assertEquals("Cannot update path[/testUpdate] that hasn't been
announced!", exception.getMessage());
+ announcer.stop();
+ }
+
+ @Test
+ @Timeout(60_000)
+ public void testSanity() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath1 = "/test1";
+ final String testPath2 = "/somewhere/test2";
+ announcer.announce(testPath1, billy);
+
+ Assertions.assertNull(curator.checkExists().forPath(testPath1), "/test1
does not exist before announcer start");
+ Assertions.assertNull(curator.checkExists().forPath(testPath2),
"/somewhere/test2 does not exist before announcer start");
+
+ announcer.start();
+ while (!announcer.getAddedPaths().contains("/test1")) {
+ Thread.sleep(100);
+ }
+
+ try {
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath1), "/test1 has data");
+ Assertions.assertNull(curator.checkExists().forPath(testPath2),
"/somewhere/test2 still does not exist");
+
+ announcer.announce(testPath2, billy);
+
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath1), "/test1 still has data");
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath2), "/somewhere/test2 has
data");
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ curator.getCuratorListenable().addListener((client, event) -> {
+ if (event.getType() == CuratorEventType.CREATE &&
event.getPath().equals(testPath1)) {
+ latch.countDown();
+ }
+ });
+ final CuratorOp deleteOp =
curator.transactionOp().delete().forPath(testPath1);
+ final Collection<CuratorTransactionResult> results =
curator.transaction().forOperations(deleteOp);
+ Assertions.assertEquals(1, results.size(), "Expected one result from the
delete op");
+ final CuratorTransactionResult result = results.iterator().next();
+ Assertions.assertEquals(Code.OK.intValue(), result.getError(), "Expected
OK code on delete");
+
+ Assertions.assertTrue(timing.forWaiting().awaitLatch(latch), "Wait for
/test1 to be recreated");
+
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath1), "Expected /test1 data to
be restored");
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath2), "Expected /somewhere/test2
data to remain");
+
+ announcer.unannounce(testPath1);
+ Assertions.assertNull(curator.checkExists().forPath(testPath1),
"Expected /test1 to be unannounced");
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath2), "Expected /somewhere/test2
to remain");
+ }
+ finally {
+ announcer.stop();
+ }
+
+ Assertions.assertNull(curator.checkExists().forPath(testPath1), "Expected
/test1 to remain unannounced");
+ Assertions.assertNull(curator.checkExists().forPath(testPath2), "Expected
/somewhere/test2 to be unannounced");
+ }
+
+ @Test
+ @Timeout(60_000)
+ public void testSessionKilled() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+ try {
+ CuratorOp createOp =
curator.transactionOp().create().forPath("/somewhere");
+ curator.transaction().forOperations(createOp);
+ announcer.start();
+
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath1 = "/test1";
+ final String testPath2 = "/somewhere/test2";
+ final String[] paths = new String[]{testPath1, testPath2};
+ announcer.announce(testPath1, billy);
+ announcer.announce(testPath2, billy);
+
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath1));
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath2));
+
+ final CountDownLatch latch = createCountdownLatchForPaths(paths);
+ KillSession.kill(curator.getZookeeperClient().getZooKeeper(),
server.getConnectString());
+
+ Assertions.assertTrue(timing.forWaiting().awaitLatch(latch), "Await
latch after killing session");
+
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath1));
+ Assertions.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath2));
+
+ announcer.stop();
+
+ while ((curator.checkExists().forPath(testPath1) != null) ||
+ (curator.checkExists().forPath(testPath2) != null)) {
+ Thread.sleep(100);
+ }
+
+ Assertions.assertNull(curator.checkExists().forPath(testPath1));
+ Assertions.assertNull(curator.checkExists().forPath(testPath2));
+ }
+ finally {
+ announcer.stop();
+ }
+ }
+
+ @Test
+ public void testRemovesParentIfCreated() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath = "/somewhere/test";
+ final String parent = ZKPaths.getPathAndNode(testPath).getPath();
+
+ announcer.start();
+ try {
+ Assertions.assertNull(curator.checkExists().forPath(parent));
+
+ awaitAnnounce(announcer, testPath, billy, true);
+
+ Assertions.assertNotNull(curator.checkExists().forPath(parent));
+ }
+ finally {
+ announcer.stop();
+ }
+
+ Assertions.assertNull(curator.checkExists().forPath(parent));
+ }
+
+ @Test
+ public void testLeavesBehindParentPathIfAlreadyExists() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath = "/somewhere/test2";
+ final String parent = ZKPaths.getPathAndNode(testPath).getPath();
+
+ curator.create().forPath(parent);
+ final Stat initialStat = curator.checkExists().forPath(parent);
+
+ announcer.start();
+ try {
+ Assertions.assertEquals(initialStat.getMzxid(),
curator.checkExists().forPath(parent).getMzxid());
+
+ awaitAnnounce(announcer, testPath, billy, true);
+
+ Assertions.assertEquals(initialStat.getMzxid(),
curator.checkExists().forPath(parent).getMzxid());
+ }
+ finally {
+ announcer.stop();
+ }
+
+ Assertions.assertEquals(initialStat.getMzxid(),
curator.checkExists().forPath(parent).getMzxid());
+ }
+
+ @Test
+ public void testLeavesParentPathsUntouchedWhenInstructed() throws Exception
+ {
+ NodeAnnouncer announcer = new NodeAnnouncer(curator, exec);
+
+ final byte[] billy = StringUtils.toUtf8("billy");
+ final String testPath = "/somewhere/test2";
+ final String parent = ZKPaths.getPathAndNode(testPath).getPath();
+
+ announcer.start();
+ try {
+ Assertions.assertNull(curator.checkExists().forPath(parent));
+
+ awaitAnnounce(announcer, testPath, billy, false);
+
+ Assertions.assertNotNull(curator.checkExists().forPath(parent));
+ }
+ finally {
+ announcer.stop();
+ }
+
+ Assertions.assertNotNull(curator.checkExists().forPath(parent));
+ }
+
+ private void awaitAnnounce(
+ final NodeAnnouncer announcer,
+ final String path,
+ final byte[] bytes,
+ boolean removeParentsIfCreated
+ ) throws InterruptedException
+ {
+ final CountDownLatch latch = createCountdownLatchForPaths(path);
+ announcer.announce(path, bytes, removeParentsIfCreated);
+ latch.await();
+ }
+
+ private CountDownLatch createCountdownLatchForPaths(String... paths)
+ {
+ final CountDownLatch latch = new CountDownLatch(paths.length);
+ curator.getCuratorListenable().addListener((client, event) -> {
+ if (event.getType() == CuratorEventType.CREATE &&
Arrays.asList(paths).contains(event.getPath())) {
+ latch.countDown();
+ }
+ });
+
+ return latch;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java
b/server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java
similarity index 78%
rename from
server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java
rename to
server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java
index e12d8bc47bb..095a38f69ce 100644
---
a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java
+++
b/server/src/test/java/org/apache/druid/curator/announcement/PathChildrenAnnouncerTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.curator.announcement;
-import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
@@ -39,16 +38,17 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/**
+ *
*/
-public class AnnouncerTest extends CuratorTestBase
+public class PathChildrenAnnouncerTest extends CuratorTestBase
{
- private static final Logger log = new Logger(AnnouncerTest.class);
+ private static final Logger log = new
Logger(PathChildrenAnnouncerTest.class);
private ExecutorService exec;
@Before
@@ -56,6 +56,8 @@ public class AnnouncerTest extends CuratorTestBase
{
setupServerAndCurator();
exec = Execs.singleThreaded("test-announcer-sanity-%s");
+ curator.start();
+ curator.blockUntilConnected();
}
@After
@@ -67,9 +69,7 @@ public class AnnouncerTest extends CuratorTestBase
@Test(timeout = 60_000L)
public void testSanity() throws Exception
{
- curator.start();
- curator.blockUntilConnected();
- Announcer announcer = new Announcer(curator, exec);
+ PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
announcer.initializeAddedChildren();
final byte[] billy = StringUtils.toUtf8("billy");
@@ -98,19 +98,8 @@ public class AnnouncerTest extends CuratorTestBase
curator.getData().decompressed().forPath(testPath2)
);
- final CountDownLatch latch = new CountDownLatch(1);
- curator.getCuratorListenable().addListener(
- new CuratorListener()
- {
- @Override
- public void eventReceived(CuratorFramework client, CuratorEvent
event)
- {
- if (event.getType() == CuratorEventType.CREATE &&
event.getPath().equals(testPath1)) {
- latch.countDown();
- }
- }
- }
- );
+ final CountDownLatch latch = createCountdownLatchForPaths(testPath1);
+
final CuratorOp deleteOp =
curator.transactionOp().delete().forPath(testPath1);
final Collection<CuratorTransactionResult> results =
curator.transaction().forOperations(deleteOp);
Assert.assertEquals(1, results.size());
@@ -149,39 +138,24 @@ public class AnnouncerTest extends CuratorTestBase
@Test(timeout = 60_000L)
public void testSessionKilled() throws Exception
{
- curator.start();
- curator.blockUntilConnected();
- Announcer announcer = new Announcer(curator, exec);
+ PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
try {
- curator.inTransaction().create().forPath("/somewhere").and().commit();
+ CuratorOp createOp =
curator.transactionOp().create().forPath("/somewhere");
+ curator.transaction().forOperations(createOp);
announcer.start();
final byte[] billy = StringUtils.toUtf8("billy");
final String testPath1 = "/test1";
final String testPath2 = "/somewhere/test2";
- final Set<String> paths = Sets.newHashSet(testPath1, testPath2);
+ final String[] paths = new String[]{testPath1, testPath2};
announcer.announce(testPath1, billy);
announcer.announce(testPath2, billy);
Assert.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath1));
Assert.assertArrayEquals(billy,
curator.getData().decompressed().forPath(testPath2));
- final CountDownLatch latch = new CountDownLatch(1);
- curator.getCuratorListenable().addListener(
- new CuratorListener()
- {
- @Override
- public void eventReceived(CuratorFramework client, CuratorEvent
event)
- {
- if (event.getType() == CuratorEventType.CREATE) {
- paths.remove(event.getPath());
- if (paths.isEmpty()) {
- latch.countDown();
- }
- }
- }
- }
- );
+ final CountDownLatch latch = createCountdownLatchForPaths(paths);
+
KillSession.kill(curator.getZookeeperClient().getZooKeeper(),
server.getConnectString());
Assert.assertTrue(timing.forWaiting().awaitLatch(latch));
@@ -204,11 +178,9 @@ public class AnnouncerTest extends CuratorTestBase
}
@Test
- public void testCleansUpItsLittleTurdlings() throws Exception
+ public void testRemovesParentIfCreated() throws Exception
{
- curator.start();
- curator.blockUntilConnected();
- Announcer announcer = new Announcer(curator, exec);
+ PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
final byte[] billy = StringUtils.toUtf8("billy");
final String testPath = "/somewhere/test2";
@@ -230,14 +202,12 @@ public class AnnouncerTest extends CuratorTestBase
}
@Test
- public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception
+ public void testLeavesBehindParentPathIfAlreadyExists() throws Exception
{
- curator.start();
- curator.blockUntilConnected();
- Announcer announcer = new Announcer(curator, exec);
+ PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
final byte[] billy = StringUtils.toUtf8("billy");
- final String testPath = "/somewhere/test2";
+ final String testPath = "/somewhere/test";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
curator.create().forPath(parent);
@@ -259,14 +229,12 @@ public class AnnouncerTest extends CuratorTestBase
}
@Test
- public void testLeavesBehindTurdlingsWhenToldTo() throws Exception
+ public void testLeavesParentPathsUntouchedWhenInstructed() throws Exception
{
- curator.start();
- curator.blockUntilConnected();
- Announcer announcer = new Announcer(curator, exec);
+ PathChildrenAnnouncer announcer = new PathChildrenAnnouncer(curator, exec);
final byte[] billy = StringUtils.toUtf8("billy");
- final String testPath = "/somewhere/test2";
+ final String testPath = "/somewhere/test";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
announcer.start();
@@ -285,26 +253,33 @@ public class AnnouncerTest extends CuratorTestBase
}
private void awaitAnnounce(
- final Announcer announcer,
+ final PathChildrenAnnouncer announcer,
final String path,
final byte[] bytes,
boolean removeParentsIfCreated
) throws InterruptedException
{
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = createCountdownLatchForPaths(path);
+ announcer.announce(path, bytes, removeParentsIfCreated);
+ latch.await();
+ }
+
+ private CountDownLatch createCountdownLatchForPaths(String... path)
+ {
+ final CountDownLatch latch = new CountDownLatch(path.length);
curator.getCuratorListenable().addListener(
new CuratorListener()
{
@Override
public void eventReceived(CuratorFramework client, CuratorEvent
event)
{
- if (event.getType() == CuratorEventType.CREATE &&
event.getPath().equals(path)) {
+ if (event.getType() == CuratorEventType.CREATE &&
Arrays.asList(path).contains(event.getPath())) {
latch.countDown();
}
}
}
);
- announcer.announce(path, bytes, removeParentsIfCreated);
- latch.await();
+
+ return latch;
}
}
diff --git
a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
index 93466876e6e..7d9368a8e1e 100644
---
a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
+++
b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.NodeRole;
@@ -70,10 +70,7 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest
extends CuratorTestBase
curator.start();
curator.blockUntilConnected();
- Announcer announcer = new Announcer(
- curator,
- Execs.directExecutor()
- );
+ NodeAnnouncer announcer = new NodeAnnouncer(curator,
Execs.directExecutor());
announcer.start();
CuratorDruidNodeAnnouncer druidNodeAnnouncer = new
CuratorDruidNodeAnnouncer(
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
index 98644cdc565..c3e04f04d6d 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
@@ -31,7 +31,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
-import org.apache.druid.curator.announcement.Announcer;
+import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.TestHelper;
@@ -284,7 +284,10 @@ public class BatchDataSegmentAnnouncerTest
List<String> zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH);
for (String zNode : zNodes) {
- DataSegment announcedSegment =
Iterables.getOnlyElement(segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH,
zNode)));
+ DataSegment announcedSegment =
Iterables.getOnlyElement(segmentReader.read(JOINER.join(
+ TEST_SEGMENTS_PATH,
+ zNode
+ )));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertTrue(announcedSegment.getDimensions().isEmpty());
Assert.assertTrue(announcedSegment.getMetrics().isEmpty());
@@ -307,7 +310,10 @@ public class BatchDataSegmentAnnouncerTest
List<String> zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH);
for (String zNode : zNodes) {
- DataSegment announcedSegment =
Iterables.getOnlyElement(segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH,
zNode)));
+ DataSegment announcedSegment =
Iterables.getOnlyElement(segmentReader.read(JOINER.join(
+ TEST_SEGMENTS_PATH,
+ zNode
+ )));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertNull(announcedSegment.getLoadSpec());
}
@@ -402,7 +408,8 @@ public class BatchDataSegmentAnnouncerTest
segmentAnnouncer.announceSegmentSchemas(
taskId,
new SegmentSchemas(Collections.singletonList(absoluteSchema1)),
- new SegmentSchemas(Collections.singletonList(absoluteSchema1)));
+ new SegmentSchemas(Collections.singletonList(absoluteSchema1))
+ );
ChangeRequestsSnapshot<DataSegmentChangeRequest> snapshot;
@@ -618,7 +625,7 @@ public class BatchDataSegmentAnnouncerTest
}
}
- private static class TestAnnouncer extends Announcer
+ private static class TestAnnouncer extends NodeAnnouncer
{
private final ConcurrentHashMap<String, ConcurrentHashMap<byte[],
AtomicInteger>> numPathAnnounced = new ConcurrentHashMap<>();
@@ -630,7 +637,9 @@ public class BatchDataSegmentAnnouncerTest
@Override
public void announce(String path, byte[] bytes, boolean
removeParentIfCreated)
{
- numPathAnnounced.computeIfAbsent(path, k -> new
ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new
AtomicInteger(0)).incrementAndGet();
+ numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(bytes, k -> new AtomicInteger(0))
+ .incrementAndGet();
super.announce(path, bytes, removeParentIfCreated);
}
}
diff --git a/website/.spelling b/website/.spelling
index 39de8156d95..62e3d2bec31 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -198,6 +198,7 @@ Murmur3
MVCC
MV_TO_ARRAY
NFS
+NodeCache
OCF
OIDC
OLAP
@@ -209,6 +210,7 @@ OutputStream
ParAccel
ParseSpec
ParseSpecs
+PathChildrenCache
Protobuf
protobuf
pull-deps
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]