This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 34c04daa9f Fix infinite iteration in http sync monitoring (#13731)
34c04daa9f is described below
commit 34c04daa9f731c0d3230062d9977fd67dd42a6f0
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Feb 8 15:14:11 2023 +0530
Fix infinite iteration in http sync monitoring (#13731)
* Fix infinite iteration in http task runner
* Fix infinite iteration in http server view
* Add tests
---
.../overlord/hrtr/HttpRemoteTaskRunner.java | 46 ++++++++------
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 72 ++++++++++++++++++++++
.../druid/client/HttpServerInventoryView.java | 64 +++++++++++--------
.../druid/client/HttpServerInventoryViewTest.java | 44 +++++++++++++
4 files changed, 182 insertions(+), 44 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 6eb1a9c28d..65f2477080 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -31,6 +31,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -574,7 +575,8 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
);
}
- private void addWorker(final Worker worker)
+ @VisibleForTesting
+ void addWorker(final Worker worker)
{
synchronized (workers) {
log.info("Worker[%s] reportin' for duty!", worker.getHost());
@@ -752,23 +754,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
log.debug("Running the Sync Monitoring.");
try {
- for (Map.Entry<String, WorkerHolder> e : workers.entrySet()) {
- WorkerHolder workerHolder = e.getValue();
- if (!workerHolder.getUnderlyingSyncer().isOK()) {
- synchronized (workers) {
- // check again that server is still there and only then
reset.
- if (workers.containsKey(e.getKey())) {
- log.makeAlert(
- "Worker[%s] is not syncing properly. Current state is
[%s]. Resetting it.",
- workerHolder.getWorker().getHost(),
- workerHolder.getUnderlyingSyncer().getDebugInfo()
- ).emit();
- removeWorker(workerHolder.getWorker());
- addWorker(workerHolder.getWorker());
- }
- }
- }
- }
+ syncMonitoring();
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
@@ -784,6 +770,30 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
);
}
+ @VisibleForTesting
+ void syncMonitoring()
+ {
+ // Ensure that the collection is not being modified during iteration.
Iterate over a copy
+ final Set<Map.Entry<String, WorkerHolder>> workerEntrySet =
ImmutableSet.copyOf(workers.entrySet());
+ for (Map.Entry<String, WorkerHolder> e : workerEntrySet) {
+ WorkerHolder workerHolder = e.getValue();
+ if (!workerHolder.getUnderlyingSyncer().isOK()) {
+ synchronized (workers) {
+ // check again that server is still there and only then reset.
+ if (workers.containsKey(e.getKey())) {
+ log.makeAlert(
+ "Worker[%s] is not syncing properly. Current state is [%s].
Resetting it.",
+ workerHolder.getWorker().getHost(),
+ workerHolder.getUnderlyingSyncer().getDebugInfo()
+ ).emit();
+ removeWorker(workerHolder.getWorker());
+ addWorker(workerHolder.getWorker());
+ }
+ }
+ }
+ }
+ }
+
/**
* This method returns the debugging information exposed by {@link
HttpRemoteTaskRunnerResource} and meant
* for that use only. It must not be used for any other purpose.
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index e78b78517a..fe1b0ca498 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -58,6 +58,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -70,6 +71,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -1668,6 +1670,52 @@ public class HttpRemoteTaskRunnerTest
}
+ @Test(timeout = 60_000L)
+ public void testSyncMonitoring_finiteIteration()
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig(),
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ new NoopProvisioningStrategy<>(),
+ druidNodeDiscoveryProvider,
+ EasyMock.createMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
+ )
+ {
+ @Override
+ protected WorkerHolder createWorkerHolder(
+ ObjectMapper smileMapper,
+ HttpClient httpClient,
+ HttpRemoteTaskRunnerConfig config,
+ ScheduledExecutorService workersSyncExec,
+ WorkerHolder.Listener listener,
+ Worker worker,
+ List<TaskAnnouncement> knownAnnouncements
+ )
+ {
+ return createNonSyncingWorkerHolder(worker);
+ }
+ };
+
+ taskRunner.start();
+ taskRunner.addWorker(createWorker("abc"));
+ taskRunner.addWorker(createWorker("xyz"));
+ taskRunner.addWorker(createWorker("lol"));
+ Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
+ taskRunner.syncMonitoring();
+ Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
+ }
+
public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator
@@ -1730,6 +1778,30 @@ public class HttpRemoteTaskRunnerTest
return taskRunner;
}
+ private Worker createWorker(String host)
+ {
+ Worker worker = EasyMock.createMock(Worker.class);
+ EasyMock.expect(worker.getHost()).andReturn(host).anyTimes();
+ EasyMock.replay(worker);
+ return worker;
+ }
+
+ private WorkerHolder createNonSyncingWorkerHolder(Worker worker)
+ {
+ ChangeRequestHttpSyncer syncer =
EasyMock.createMock(ChangeRequestHttpSyncer.class);
+ EasyMock.expect(syncer.isOK()).andReturn(false).anyTimes();
+
EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes();
+ WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+
EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes();
+ EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
+ workerHolder.start();
+ EasyMock.expectLastCall();
+ workerHolder.stop();
+ EasyMock.expectLastCall();
+ EasyMock.replay(syncer, workerHolder);
+ return workerHolder;
+ }
+
private static WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
diff --git
a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index decab1f7cc..893d455dc7 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -21,11 +21,13 @@ package org.apache.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import org.apache.druid.concurrent.LifecycleLock;
@@ -58,6 +60,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -375,7 +378,8 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
);
}
- private void serverAdded(DruidServer server)
+ @VisibleForTesting
+ void serverAdded(DruidServer server)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(server.getName());
@@ -430,31 +434,7 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
log.debug("Running the Sync Monitoring.");
try {
- for (Map.Entry<String, DruidServerHolder> e : servers.entrySet()) {
- DruidServerHolder serverHolder = e.getValue();
- if (!serverHolder.syncer.isOK()) {
- synchronized (servers) {
- // check again that server is still there and only then
reset.
- if (servers.containsKey(e.getKey())) {
- log.makeAlert(
- "Server[%s] is not syncing properly. Current state is
[%s]. Resetting it.",
- serverHolder.druidServer.getName(),
- serverHolder.syncer.getDebugInfo()
- ).emit();
- serverRemoved(serverHolder.druidServer);
- serverAdded(new DruidServer(
- serverHolder.druidServer.getName(),
- serverHolder.druidServer.getHostAndPort(),
- serverHolder.druidServer.getHostAndTlsPort(),
- serverHolder.druidServer.getMaxSize(),
- serverHolder.druidServer.getType(),
- serverHolder.druidServer.getTier(),
- serverHolder.druidServer.getPriority()
- ));
- }
- }
- }
- }
+ syncMonitoring();
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
@@ -470,6 +450,38 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
);
}
+ @VisibleForTesting
+ void syncMonitoring()
+ {
+ // Ensure that the collection is not being modified during iteration.
Iterate over a copy
+ final Set<Map.Entry<String, DruidServerHolder>> serverEntrySet =
ImmutableSet.copyOf(servers.entrySet());
+ for (Map.Entry<String, DruidServerHolder> e : serverEntrySet) {
+ DruidServerHolder serverHolder = e.getValue();
+ if (!serverHolder.syncer.isOK()) {
+ synchronized (servers) {
+ // check again that server is still there and only then reset.
+ if (servers.containsKey(e.getKey())) {
+ log.makeAlert(
+ "Server[%s] is not syncing properly. Current state is [%s].
Resetting it.",
+ serverHolder.druidServer.getName(),
+ serverHolder.syncer.getDebugInfo()
+ ).emit();
+ serverRemoved(serverHolder.druidServer);
+ serverAdded(new DruidServer(
+ serverHolder.druidServer.getName(),
+ serverHolder.druidServer.getHostAndPort(),
+ serverHolder.druidServer.getHostAndTlsPort(),
+ serverHolder.druidServer.getMaxSize(),
+ serverHolder.druidServer.getType(),
+ serverHolder.druidServer.getTier(),
+ serverHolder.druidServer.getPriority()
+ ));
+ }
+ }
+ }
+ }
+ }
+
@Override
public boolean isStarted()
{
diff --git
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 896f90d67d..b08db90c2c 100644
---
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -273,6 +273,50 @@ public class HttpServerInventoryViewTest
httpServerInventoryView.stop();
}
+ @Test(timeout = 60_000L)
+ public void testSyncMonitoring()
+ {
+ ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ TestHttpClient httpClient = new TestHttpClient(ImmutableList.of());
+
+ HttpServerInventoryView httpServerInventoryView = new
HttpServerInventoryView(
+ jsonMapper,
+ httpClient,
+ druidNodeDiscoveryProvider,
+ (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"),
+ new HttpServerInventoryViewConfig(null, null, null),
+ "test"
+ );
+
+ httpServerInventoryView.start();
+ httpServerInventoryView.serverAdded(makeServer("abc.com:8080"));
+ httpServerInventoryView.serverAdded(makeServer("xyz.com:8080"));
+ httpServerInventoryView.serverAdded(makeServer("lol.com:8080"));
+ Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size());
+ httpServerInventoryView.syncMonitoring();
+ Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size());
+ }
+
+ private DruidServer makeServer(String host)
+ {
+ return new DruidServer(
+ host,
+ host,
+ host,
+ 100_000_000L,
+ ServerType.HISTORICAL,
+ "__default_tier",
+ 50
+ );
+ }
+
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
Listener listener;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]