This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 155fde33ff Add metrics to SegmentMetadataCache refresh (#14453)
155fde33ff is described below
commit 155fde33ff7cccdb7dfd9ece3b3fd8f800f5948f
Author: Rishabh Singh <[email protected]>
AuthorDate: Fri Jun 23 16:51:08 2023 +0530
Add metrics to SegmentMetadataCache refresh (#14453)
New metrics:
- `segment/metadatacache/refresh/time`: time taken to refresh segments per
datasource
- `segment/metadatacache/refresh/count`: number of segments being refreshed
per datasource
---
docs/operations/metrics.md | 3 ++
.../sql/calcite/schema/SegmentMetadataCache.java | 16 +++++++-
.../calcite/schema/SegmentMetadataCacheTest.java | 48 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 2 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 16aaca9083..f85cbec77d 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -66,6 +66,9 @@ Metrics may have additional dimensions beyond those listed
above.
|`sqlQuery/bytes`|Number of bytes returned in the SQL query response.|`id`,
`nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`| |
|`init/serverview/time`|Time taken to initialize the broker server view.
Useful to detect if brokers are taking too long to start.||Depends on the
number of segments.|
|`init/metadatacache/time`|Time taken to initialize the broker segment
metadata cache. Useful to detect if brokers are taking too long to
start||Depends on the number of segments.|
+|`segment/metadatacache/refresh/count`|Number of segments to refresh in broker
segment metadata cache.|`dataSource`|
+|`segment/metadatacache/refresh/time`|Time taken to refresh segments in broker
segment metadata cache.|`dataSource`|
+
### Historical
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
index 71c19734ec..ea3dc39567 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Interner;
@@ -48,6 +49,7 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
@@ -88,6 +90,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -707,6 +710,8 @@ public class SegmentMetadataCache
private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource,
final Set<SegmentId> segments)
throws IOException
{
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
if (!segments.stream().allMatch(segmentId ->
segmentId.getDataSource().equals(dataSource))) {
// Sanity check. We definitely expect this to pass.
throw new ISE("'segments' must all match 'dataSource'!");
@@ -714,7 +719,10 @@ public class SegmentMetadataCache
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
- final long startTime = System.currentTimeMillis();
+ final ServiceMetricEvent.Builder builder =
+ new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE,
dataSource);
+
+ emitter.emit(builder.build("segment/metadatacache/refresh/count",
segments.size()));
// Segment id string -> SegmentId object.
final Map<String, SegmentId> segmentIdMap = Maps.uniqueIndex(segments,
SegmentId::toString);
@@ -783,10 +791,14 @@ public class SegmentMetadataCache
yielder.close();
}
+ long refreshDurationMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ emitter.emit(builder.build("segment/metadatacache/refresh/time",
refreshDurationMillis));
+
log.debug(
"Refreshed metadata for dataSource [%s] in %,d ms (%d segments
queried, %d segments left).",
dataSource,
- System.currentTimeMillis() - startTime,
+ refreshDurationMillis,
retVal.size(),
segments.size() - retVal.size()
);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
index fb55d22710..0414878b4a 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
@@ -39,6 +39,8 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
@@ -1447,6 +1449,52 @@ public class SegmentMetadataCacheTest extends
SegmentMetadataCacheCommon
Assert.assertNull(schema.getDatasource("wat"));
}
+ @Test
+ public void testRefreshShouldEmitMetrics() throws InterruptedException,
IOException
+ {
+ String datasource = "xyz";
+ CountDownLatch addSegmentLatch = new CountDownLatch(2);
+ StubServiceEmitter emitter = new StubServiceEmitter("broker", "host");
+ SegmentMetadataCache schema = new SegmentMetadataCache(
+ CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+ serverView,
+ segmentManager,
+ new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+ SEGMENT_CACHE_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig(),
+ emitter
+ )
+ {
+ @Override
+ protected void addSegment(final DruidServerMetadata server, final
DataSegment segment)
+ {
+ super.addSegment(server, segment);
+ if (datasource.equals(segment.getDataSource())) {
+ addSegmentLatch.countDown();
+ }
+ }
+
+ @Override
+ void removeSegment(final DataSegment segment)
+ {
+ super.removeSegment(segment);
+ }
+ };
+
+ List<DataSegment> segments = ImmutableList.of(
+ newSegment(datasource, 1),
+ newSegment(datasource, 2)
+ );
+ serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
+ serverView.addSegment(segments.get(1), ServerType.REALTIME);
+ Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
+
schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
Sets.newHashSet(datasource));
+
+ emitter.verifyEmitted("segment/metadatacache/refresh/time",
ImmutableMap.of(DruidMetrics.DATASOURCE, datasource), 1);
+ emitter.verifyEmitted("segment/metadatacache/refresh/count",
ImmutableMap.of(DruidMetrics.DATASOURCE, datasource), 1);
+ }
+
private static DataSegment newSegment(String datasource, int partitionId)
{
return new DataSegment(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]