This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1aecefb7605 Add metrics for Historical cloning (#17956)
1aecefb7605 is described below
commit 1aecefb760579b1596e097be6316dc5698d4bba4
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri May 2 13:50:33 2025 +0530
Add metrics for Historical cloning (#17956)
Add metrics for monitoring cloning and broker config syncing. These metrics
are:
- config/brokerSync/time
- config/brokerSync/total/time
- config/brokerSync/error
Also addresses followup comments from
https://github.com/apache/druid/pull/17899 including:
- Fixing documentation of context parameter
- Add a more graceful shutdown
- Rename some fields for consistency
---
docs/api-reference/service-status-api.md | 2 +-
docs/querying/query-context.md | 2 +-
.../client/BrokerViewOfCoordinatorConfig.java | 2 +-
.../server/coordinator/duty/CloneHistoricals.java | 4 +-
.../druid/server/coordinator/stats/Stats.java | 10 ++
.../apache/druid/server/http/BrokerSyncStatus.java | 16 ++--
.../http/CoordinatorDynamicConfigSyncer.java | 65 ++++++++++---
.../http/CoordinatorDynamicConfigSyncerTest.java | 106 +++++++++++++++++++++
.../java/org/apache/druid/cli/CliCoordinator.java | 2 +-
9 files changed, 184 insertions(+), 25 deletions(-)
diff --git a/docs/api-reference/service-status-api.md
b/docs/api-reference/service-status-api.md
index 48679658d44..47d2a5a6d3f 100644
--- a/docs/api-reference/service-status-api.md
+++ b/docs/api-reference/service-status-api.md
@@ -782,7 +782,7 @@ Host: http://COORDINATOR_IP:COORDINATOR_PORT
{
"host": "localhost",
"port": 8082,
- "syncTimeInMs": 1745756337472
+ "lastSyncTimestampMillis": 1745756337472
}
]
}
diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md
index 78aec8f9f28..0a277949fdd 100644
--- a/docs/querying/query-context.md
+++ b/docs/querying/query-context.md
@@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query
context parameters
|`debug`| `false` | Flag indicating whether to enable debugging outputs for
the query. When set to false, no additional logs will be produced (logs
produced will be entirely dependent on your logging level). When set to true,
the following addition logs will be produced:<br />- Log the stack trace of the
exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be
set to `queryType_dataSource_intervals` while processing a query. This aids in
interpreting thread dumps, and is on by default. Query overhead can be reduced
slightly by setting this to `false`. This has a tiny effect in most scenarios,
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge
two Project operators when inlining expressions causes complexity to increase.
Implemented as a workaround to exception `There are not enough rules to produce
a node with desired properties: convention=DRUID, sort=[]` thrown after
rejecting the merge of two projects.|
-|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should
be queried by brokers. Clone servers are created by the `cloneServers`
Coordinator dynamic configuration. Possible values are `excludeClones`,
`includeClones` and `clonesPreferred`. `excludeClones` means that clone
Historicals are not queried by the broker. `clonesPreferred` indicates that
when given a choice between the clone Historical and the original Historical
which is being cloned, the broker chooses the [...]
+|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should
be queried by brokers. Clone servers are created by the `cloneServers`
Coordinator dynamic configuration. Possible values are `excludeClones`,
`includeClones` and `preferClones`. `excludeClones` means that clone
Historicals are not queried by the broker. `preferClones` indicates that when
given a choice between the clone Historical and the original Historical which
is being cloned, the broker chooses the clones [...]
## Parameters by query type
diff --git
a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
index 5334fff0731..fd6957bc4e6 100644
---
a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
+++
b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
@@ -166,7 +166,7 @@ public class BrokerViewOfCoordinatorConfig implements
HistoricalFilter
// Don't remove either.
return Set.of();
default:
- throw DruidException.defensive("Unexpected value: [%s]",
cloneQueryMode);
+ throw DruidException.defensive("Unexpected value of
cloneQueryMode[%s]", cloneQueryMode);
}
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index 939686256ee..70b4e0a950b 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -105,7 +105,7 @@ public class CloneHistoricals implements CoordinatorDuty
if (!targetProjectedSegments.contains(segment) &&
loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) {
stats.add(
Stats.Segments.ASSIGNED_TO_CLONE,
- RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()),
+ RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
1L
);
}
@@ -116,7 +116,7 @@ public class CloneHistoricals implements CoordinatorDuty
if (!sourceProjectedSegments.contains(segment) &&
loadQueueManager.dropSegment(segment, targetServer)) {
stats.add(
Stats.Segments.DROPPED_FROM_CLONE,
- RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()),
+ RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
1L
);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 0571245c985..f4ae1b5f8a9 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -186,4 +186,14 @@ public class Stats
public static final CoordinatorStat COMPUTE_THREADS =
CoordinatorStat.toDebugOnly("balancerComputeThreads");
public static final CoordinatorStat MAX_TO_MOVE =
CoordinatorStat.toDebugOnly("maxToMove");
}
+
+ public static class Configuration
+ {
+ public static final CoordinatorStat BROKER_SYNC_TIME
+ = CoordinatorStat.toDebugAndEmit("brokerSyncTime",
"config/brokerSync/time");
+ public static final CoordinatorStat TOTAL_SYNC_TIME
+ = CoordinatorStat.toDebugAndEmit("totalBrokerSyncTime",
"config/brokerSync/total/time");
+ public static final CoordinatorStat BROKER_SYNC_ERROR
+ = CoordinatorStat.toDebugAndEmit("configSyncFailure",
"config/brokerSync/error");
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java
b/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java
index 483be1db5bd..15970a8bf4a 100644
--- a/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java
+++ b/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java
@@ -32,25 +32,25 @@ public class BrokerSyncStatus
{
private final String host;
private final int port;
- private final long syncTimeInMs;
+ private final long lastSyncTimestampMillis;
@JsonCreator
public BrokerSyncStatus(
@JsonProperty("host") String host,
@JsonProperty("port") int port,
- @JsonProperty("syncTimeInMs") long syncTimeInMs
+ @JsonProperty("lastSyncTimestampMillis") long lastSyncTimestampMillis
)
{
this.host = host;
this.port = port;
- this.syncTimeInMs = syncTimeInMs;
+ this.lastSyncTimestampMillis = lastSyncTimestampMillis;
}
- public BrokerSyncStatus(ServiceLocation broker, long syncTimeInMs)
+ public BrokerSyncStatus(ServiceLocation broker, long lastSyncTimestampMillis)
{
this.host = broker.getHost();
this.port = broker.getTlsPort() > 0 ? broker.getTlsPort() :
broker.getPlaintextPort();
- this.syncTimeInMs = syncTimeInMs;
+ this.lastSyncTimestampMillis = lastSyncTimestampMillis;
}
@JsonProperty
@@ -66,9 +66,9 @@ public class BrokerSyncStatus
}
@JsonProperty
- public long getSyncTimeInMs()
+ public long getLastSyncTimestampMillis()
{
- return syncTimeInMs;
+ return lastSyncTimestampMillis;
}
@Override
@@ -96,7 +96,7 @@ public class BrokerSyncStatus
return "BrokerSyncStatus{" +
"host='" + host + '\'' +
", port=" + port +
- ", syncTimeInMs=" + syncTimeInMs +
+ ", lastSyncTimestampMillis=" + lastSyncTimestampMillis +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java
b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java
index cdf4611a365..1e286219aeb 100644
---
a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java
+++
b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.http;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.client.broker.BrokerClient;
@@ -29,8 +30,12 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
@@ -38,15 +43,19 @@ import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
/**
* Updates all brokers with the latest coordinator dynamic config.
@@ -61,6 +70,7 @@ public class CoordinatorDynamicConfigSyncer
private final ServiceClientFactory clientFactory;
private final ScheduledExecutorService exec;
+ private final ServiceEmitter emitter;
private @Nullable Future<?> syncFuture = null;
@GuardedBy("this")
@@ -72,7 +82,8 @@ public class CoordinatorDynamicConfigSyncer
@EscalatedGlobal final ServiceClientFactory clientFactory,
final CoordinatorConfigManager configManager,
@Json final ObjectMapper jsonMapper,
- final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
+ final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
+ final ServiceEmitter emitter
)
{
this.clientFactory = clientFactory;
@@ -81,6 +92,7 @@ public class CoordinatorDynamicConfigSyncer
this.druidNodeDiscovery = druidNodeDiscoveryProvider;
this.exec =
Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d");
this.inSyncBrokers = ConcurrentHashMap.newKeySet();
+ this.emitter = emitter;
}
/**
@@ -95,12 +107,19 @@ public class CoordinatorDynamicConfigSyncer
* Push the latest coordinator dynamic config, provided by the configManager
to all currently known Brokers. Also
* invalidates the set of inSyncBrokers if the config has changed.
*/
- private void broadcastConfigToBrokers()
+ @VisibleForTesting
+ void broadcastConfigToBrokers()
{
invalidateInSyncBrokersIfNeeded();
- for (ServiceLocation broker : getKnownBrokers()) {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ for (DiscoveryDruidNode broker : getKnownBrokers()) {
pushConfigToBroker(broker);
}
+ emitStat(
+ Stats.Configuration.TOTAL_SYNC_TIME,
+ RowKey.empty(),
+ stopwatch.millisElapsed()
+ );
}
/**
@@ -136,12 +155,20 @@ public class CoordinatorDynamicConfigSyncer
}
}
+ @LifecycleStop
+ public void stop()
+ {
+ exec.shutdownNow();
+ }
+
/**
* Push the latest coordinator dynamic config, provided by the configManager
to the Broker at the brokerLocation
* param.
*/
- private void pushConfigToBroker(ServiceLocation brokerLocation)
+ private void pushConfigToBroker(DiscoveryDruidNode broker)
{
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ final ServiceLocation brokerLocation =
CoordinatorDynamicConfigSyncer.convertDiscoveryNodeToServiceLocation(broker);
final BrokerClient brokerClient = new BrokerClientImpl(
clientFactory.makeClient(
NodeRole.BROKER.getJsonName(),
@@ -161,19 +188,26 @@ public class CoordinatorDynamicConfigSyncer
catch (Exception e) {
// Catch and ignore the exception, wait for the next sync.
log.error(e, "Exception while syncing dynamic configuration to
broker[%s]", brokerLocation);
+ emitStat(
+ Stats.Configuration.BROKER_SYNC_ERROR,
+ RowKey.with(Dimension.SERVER,
broker.getDruidNode().getHostAndPortToUse()).build(),
+ 1
+ );
}
+ emitStat(
+ Stats.Configuration.BROKER_SYNC_TIME,
+ RowKey.with(Dimension.SERVER,
broker.getDruidNode().getHostAndPortToUse()).build(),
+ stopwatch.millisElapsed()
+ );
}
/**
- * Returns a list of {@link ServiceLocation} for all brokers currently known
to the druidNodeDiscovery.
+ * Returns a collection of {@link DiscoveryDruidNode} for all brokers
currently known to the druidNodeDiscovery.
*/
- private Set<ServiceLocation> getKnownBrokers()
+ private Collection<DiscoveryDruidNode> getKnownBrokers()
{
return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER)
- .getAllNodes()
- .stream()
-
.map(CoordinatorDynamicConfigSyncer::convertDiscoveryNodeToServiceLocation)
- .collect(Collectors.toSet());
+ .getAllNodes();
}
/**
@@ -218,4 +252,13 @@ public class CoordinatorDynamicConfigSyncer
""
);
}
+
+ private void emitStat(CoordinatorStat stat, RowKey rowKey, long value)
+ {
+ ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder();
+ rowKey.getValues().forEach(
+ (dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(),
dimValue)
+ );
+ emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value));
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncerTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncerTest.java
new file mode 100644
index 00000000000..2764ab0e6f1
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class CoordinatorDynamicConfigSyncerTest
+{
+ private CoordinatorDynamicConfigSyncer target;
+
+ private ServiceClient serviceClient;
+ private CoordinatorConfigManager coordinatorConfigManager;
+ private DruidNodeDiscovery druidNodeDiscovery;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ serviceClient = mock(ServiceClient.class);
+
+ BytesFullResponseHolder holder = mock(BytesFullResponseHolder.class);
+ doReturn(HttpResponseStatus.OK).when(holder).getStatus();
+ doReturn(Futures.immediateFuture(holder))
+ .when(serviceClient).asyncRequest(ArgumentMatchers.any(),
ArgumentMatchers.any());
+
+ coordinatorConfigManager = mock(CoordinatorConfigManager.class);
+ DruidNodeDiscoveryProvider provider =
mock(DruidNodeDiscoveryProvider.class);
+ druidNodeDiscovery = mock(DruidNodeDiscovery.class);
+
doReturn(druidNodeDiscovery).when(provider).getForNodeRole(NodeRole.BROKER);
+
+ target = new CoordinatorDynamicConfigSyncer(
+ (serviceName, serviceLocator, retryPolicy) -> serviceClient,
+ coordinatorConfigManager,
+ DefaultObjectMapper.INSTANCE,
+ provider,
+ mock(ServiceEmitter.class)
+ );
+ }
+
+ @Test
+ public void testSync()
+ {
+ CoordinatorDynamicConfig config = CoordinatorDynamicConfig
+ .builder()
+ .withMaxSegmentsToMove(105)
+ .withReplicantLifetime(500)
+ .withReplicationThrottleLimit(5)
+ .build();
+
+ doReturn(config).when(coordinatorConfigManager).getCurrentDynamicConfig();
+ List<DiscoveryDruidNode> nodes = List.of(
+ new DiscoveryDruidNode(
+ new DruidNode("service", "host", false, 8080, null, true, false),
+ NodeRole.BROKER,
+ null,
+ null
+ )
+ );
+ doReturn(nodes).when(druidNodeDiscovery).getAllNodes();
+
+ target.broadcastConfigToBrokers();
+ RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST,
"/druid-internal/v1/config/coordinator")
+ .jsonContent(DefaultObjectMapper.INSTANCE, config);
+ verify(serviceClient).asyncRequest(eq(requestBuilder),
ArgumentMatchers.any());
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 993f3f7bf4c..cec857b4969 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -226,7 +226,7 @@ public class CliCoordinator extends ServerRunnable
binder.bind(DruidCoordinatorConfig.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
-
binder.bind(CoordinatorDynamicConfigSyncer.class).in(LazySingleton.class);
+
binder.bind(CoordinatorDynamicConfigSyncer.class).in(ManageLifecycle.class);
if (beOverlord) {
binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]