This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8307d8a26c6 add defaultMetrics for logging emitter (#19030)
8307d8a26c6 is described below
commit 8307d8a26c6b7286a8a57b85940a7f29d3d43248
Author: sarangv <[email protected]>
AuthorDate: Sat Mar 7 12:37:21 2026 -0800
add defaultMetrics for logging emitter (#19030)
Fixes https://github.com/apache/druid/issues/19021.
Operators can limit which metrics the logging emitter writes by setting
druid.emitter.logging.shouldFilterMetrics=true and, if desired,
druid.emitter.logging.allowedMetricsPath to a JSON object file (keys = metric
names). Alerts and other non-metric events are always logged.
Co-authored-by: Sarang Vadali <[email protected]>
---
docs/configuration/index.md | 2 +
.../druid/java/util/emitter/core/Emitters.java | 10 +
.../java/util/emitter/core/LoggingEmitter.java | 103 ++++++++-
.../util/emitter/core/LoggingEmitterConfig.java | 34 +++
processing/src/main/resources/defaultMetrics.json | 249 +++++++++++++++++++++
.../emitter/core/LoggingEmitterConfigTest.java | 12 +
.../java/util/emitter/core/LoggingEmitterTest.java | 213 ++++++++++++++++++
processing/src/test/resources/defaultMetrics.json | 1 +
8 files changed, 623 insertions(+), 1 deletion(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c370ed6e1c5..65fcd39d7e6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -2023,6 +2023,8 @@ log4j config to route these logs to different sources
based on the feed of the e
|--------|-----------|--------|
|`druid.emitter.logging.loggerClass`|The class used for
logging.|`org.apache.druid.java.util.emitter.core.LoggingEmitter`|
|`druid.emitter.logging.logLevel`|Choices: debug, info, warn, error. The log
level at which message are logged.|info|
+|`druid.emitter.logging.shouldFilterMetrics`|When true, only metrics listed in
the allow list are emitted; non-metric events (e.g. alerts) are always emitted.
When false, all events are logged (backward-compatible).|false|
+|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are
the allowed metric names. Only used when `shouldFilterMetrics` is true. If null
or empty, the bundled classpath resource `defaultMetrics.json` is used. If a
path is set but the file is missing, a warning is logged and the emitter falls
back to the default classpath resource.|null|
#### HTTP emitter module
diff --git
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
index 00d424b88fc..217b2483174 100644
---
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
+++
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
@@ -140,6 +140,16 @@ public class Emitters
loggingMap.put(
"logLevel",
props.getProperty("org.apache.druid.java.util.emitter.logging.level", "debug")
);
+ if
(props.containsKey("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics"))
{
+ loggingMap.put(
+ "shouldFilterMetrics",
Boolean.parseBoolean(props.getProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics"))
+ );
+ }
+ if
(props.containsKey("org.apache.druid.java.util.emitter.logging.allowedMetricsPath"))
{
+ loggingMap.put(
+ "allowedMetricsPath",
props.getProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath")
+ );
+ }
return loggingMap;
}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
index afc8e07e2c2..17392515dd6 100644
---
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
+++
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
@@ -22,13 +22,26 @@ package org.apache.druid.java.util.emitter.core;
/**
*/
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.slf4j.MarkerFactory;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,22 +49,100 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class LoggingEmitter implements Emitter
{
+ private static final Logger LOGGER = new Logger(LoggingEmitter.class);
+ private static final String DEFAULT_ALLOWED_METRICS_RESOURCE =
"defaultMetrics.json";
+
private final Logger log;
private final Level level;
private final ObjectMapper jsonMapper;
+ @Nullable
+ private final Set<String> allowedMetrics;
private final AtomicBoolean started = new AtomicBoolean(false);
public LoggingEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper)
{
- this(new Logger(config.getLoggerClass()),
Level.toLevel(config.getLogLevel()), jsonMapper);
+ this(
+ new Logger(config.getLoggerClass()),
+ Level.toLevel(config.getLogLevel()),
+ jsonMapper,
+ config.shouldFilterMetrics(),
+ config.getAllowedMetricsPath()
+ );
}
public LoggingEmitter(Logger log, Level level, ObjectMapper jsonMapper)
+ {
+ this(log, level, jsonMapper, false, null);
+ }
+
+ public LoggingEmitter(
+ Logger log,
+ Level level,
+ ObjectMapper jsonMapper,
+ boolean shouldFilterMetrics,
+ @Nullable String allowedMetricsPath
+ )
{
this.log = log;
this.level = level;
this.jsonMapper = jsonMapper;
+ this.allowedMetrics = shouldFilterMetrics ?
loadAllowedMetrics(allowedMetricsPath, jsonMapper) : null;
+ }
+
+ /**
+ * Loads the allowed metric names from a JSON file. If the path is null or
empty,
+ * loads from the bundled classpath resource (defaultMetrics.json). If a
custom
+ * path is provided but the file is missing, logs a warning and falls back to
+ * the default classpath resource.
+ */
+ private static Set<String> loadAllowedMetrics(@Nullable String path,
ObjectMapper jsonMapper)
+ {
+ final InputStream is = openAllowedMetricsStream(path);
+ try {
+ final Map<String, Object> metricsMap = jsonMapper.readValue(is, new
TypeReference<Map<String, Object>>() {});
+ return Collections.unmodifiableSet(metricsMap.keySet());
+ }
+ catch (IOException e) {
+ final String source = path == null || Strings.isNullOrEmpty(path) ?
DEFAULT_ALLOWED_METRICS_RESOURCE : path;
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Allowed metrics file must be a JSON
object with metric names as keys; failed to parse [%s]", source);
+ }
+ }
+
+ /**
+ * Opens the allowed metrics configuration stream. Uses classpath resource
when
+ * path is null/empty. When a custom path is specified but the file is
missing,
+ * logs a warning and falls back to the default classpath resource.
+ */
+ private static InputStream openAllowedMetricsStream(@Nullable String path)
+ {
+ if (Strings.isNullOrEmpty(path)) {
+ LOGGER.info("Using default allowed metrics configuration from classpath
resource [%s]", DEFAULT_ALLOWED_METRICS_RESOURCE);
+ return openDefaultAllowedMetricsResource();
+ }
+ try {
+ final InputStream is = new FileInputStream(new File(path));
+ LOGGER.info("Using allowed metrics configuration at [%s]", path);
+ return is;
+ }
+ catch (FileNotFoundException e) {
+ LOGGER.warn(e, "Allowed metrics file [%s] not found, falling back to
default classpath resource [%s]",
+ path, DEFAULT_ALLOWED_METRICS_RESOURCE);
+ return openDefaultAllowedMetricsResource();
+ }
+ }
+
+ private static InputStream openDefaultAllowedMetricsResource()
+ {
+ final InputStream is =
LoggingEmitter.class.getClassLoader().getResourceAsStream(DEFAULT_ALLOWED_METRICS_RESOURCE);
+ if (is == null) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.NOT_FOUND)
+ .build("Could not find default allowed metrics
resource [%s] on classpath", DEFAULT_ALLOWED_METRICS_RESOURCE);
+ }
+ return is;
}
@Override
@@ -95,6 +186,16 @@ public class LoggingEmitter implements Emitter
throw new RejectedExecutionException("Service not started.");
}
}
+
+ // Allowlist filtering: only applies to ServiceMetricEvents.
+ // Non-metric events (alerts, etc.) always pass through.
+ if (allowedMetrics != null && event instanceof ServiceMetricEvent) {
+ final String metricName = ((ServiceMetricEvent) event).getMetric();
+ if (!allowedMetrics.contains(metricName)) {
+ return;
+ }
+ }
+
try {
switch (level) {
case TRACE:
diff --git
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
index 39b85126f77..7aa3d579749 100644
---
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
+++
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.java.util.emitter.core;
import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
/**
@@ -35,6 +36,26 @@ public class LoggingEmitterConfig
@JsonProperty
private String logLevel = "info";
+ /**
+ * When true, only metrics listed in the allowed metrics configuration are
emitted.
+ * If {@link #allowedMetricsPath} is null/empty, the bundled default
allowlist
+ * (defaultMetrics.json on the classpath) is used. If a path is provided,
+ * it is loaded from that file instead.
+ * Defaults to false (emit all metrics, backward-compatible behavior).
+ */
+ @JsonProperty("shouldFilterMetrics")
+ private boolean shouldFilterMetrics = false;
+
+ /**
+ * Optional path to a JSON file containing an array of allowed metric names.
+ * Only used when {@link #shouldFilterMetrics} is true.
+ * If null or empty, the bundled default resource (defaultMetrics.json) is
loaded
+ * from the classpath, mirroring how the Prometheus emitter loads its
defaultMetrics.json.
+ */
+ @JsonProperty
+ @Nullable
+ private String allowedMetricsPath = null;
+
public String getLoggerClass()
{
return loggerClass;
@@ -45,12 +66,25 @@ public class LoggingEmitterConfig
return logLevel;
}
+ public boolean shouldFilterMetrics()
+ {
+ return shouldFilterMetrics;
+ }
+
+ @Nullable
+ public String getAllowedMetricsPath()
+ {
+ return allowedMetricsPath;
+ }
+
@Override
public String toString()
{
return "LoggingEmitterConfig{" +
"loggerClass='" + loggerClass + '\'' +
", logLevel='" + logLevel + '\'' +
+ ", shouldFilterMetrics=" + shouldFilterMetrics +
+ ", allowedMetricsPath='" + allowedMetricsPath + '\'' +
'}';
}
}
diff --git a/processing/src/main/resources/defaultMetrics.json
b/processing/src/main/resources/defaultMetrics.json
new file mode 100644
index 00000000000..ea6c2fe6b5f
--- /dev/null
+++ b/processing/src/main/resources/defaultMetrics.json
@@ -0,0 +1,249 @@
+{
+ "compact/segmentAnalyzer/fetchAndProcessMillis": [],
+ "compact/task/count": [],
+ "compactTask/availableSlot/count": [],
+ "compactTask/maxSlot/count": [],
+ "coordinator/global/time": [],
+ "coordinator/time": [],
+ "groupBy/maxMergeDictionarySize": [],
+ "groupBy/maxSpilledBytes": [],
+ "groupBy/mergeDictionarySize": [],
+ "groupBy/spilledBytes": [],
+ "groupBy/spilledQueries": [],
+ "ingest/count": [],
+ "ingest/events/duplicate": [],
+ "ingest/events/messageGap": [],
+ "ingest/events/processed": [],
+ "ingest/events/processedWithError": [],
+ "ingest/events/thrownAway": [],
+ "ingest/events/unparseable": [],
+ "ingest/handoff/count": [],
+ "ingest/handoff/failed": [],
+ "ingest/handoff/time": [],
+ "ingest/input/bytes": [],
+ "ingest/kafka/avgLag": [],
+ "ingest/kafka/lag": [],
+ "ingest/kafka/maxLag": [],
+ "ingest/kafka/partitionLag": [],
+ "ingest/merge/cpu": [],
+ "ingest/merge/time": [],
+ "ingest/notices/queueSize": [],
+ "ingest/notices/time": [],
+ "ingest/pause/time": [],
+ "ingest/persists/backPressure": [],
+ "ingest/persists/count": [],
+ "ingest/persists/cpu": [],
+ "ingest/persists/failed": [],
+ "ingest/persists/time": [],
+ "ingest/rows/output": [],
+ "ingest/segments/count": [],
+ "ingest/sink/count": [],
+ "ingest/tombstones/count": [],
+ "interval/compacted/count": [],
+ "interval/skipCompact/count": [],
+ "interval/waitCompact/count": [],
+ "jetty/numOpenConnections": [],
+ "jetty/threadPool/busy": [],
+ "jetty/threadPool/idle": [],
+ "jetty/threadPool/isLowOnThreads": [],
+ "jetty/threadPool/max": [],
+ "jetty/threadPool/min": [],
+ "jetty/threadPool/queueSize": [],
+ "jetty/threadPool/ready": [],
+ "jetty/threadPool/total": [],
+ "jetty/threadPool/utilizationRate": [],
+ "jetty/threadPool/utilized": [],
+ "jvm/bufferpool/capacity": [],
+ "jvm/bufferpool/count": [],
+ "jvm/bufferpool/used": [],
+ "jvm/gc/count": [],
+ "jvm/gc/cpu": [],
+ "jvm/mem/committed": [],
+ "jvm/mem/init": [],
+ "jvm/mem/max": [],
+ "jvm/mem/used": [],
+ "jvm/pool/committed": [],
+ "jvm/pool/init": [],
+ "jvm/pool/max": [],
+ "jvm/pool/used": [],
+ "kafka/consumer/bytesConsumed": [],
+ "kafka/consumer/fetch": [],
+ "kafka/consumer/fetchLatencyAvg": [],
+ "kafka/consumer/fetchLatencyMax": [],
+ "kafka/consumer/fetchRate": [],
+ "kafka/consumer/fetchSizeAvg": [],
+ "kafka/consumer/fetchSizeMax": [],
+ "kafka/consumer/incomingBytes": [],
+ "kafka/consumer/outgoingBytes": [],
+ "kafka/consumer/recordsConsumed": [],
+ "kafka/consumer/recordsLag": [],
+ "kafka/consumer/recordsPerRequestAvg": [],
+ "kill/eligibleUnusedSegments/count": [],
+ "kill/pendingSegments/count": [],
+ "kill/task/count": [],
+ "killTask/availableSlot/count": [],
+ "killTask/maxSlot/count": [],
+ "mergeBuffer/acquisitionTimeNs": [],
+ "mergeBuffer/maxAcquisitionTimeNs": [],
+ "mergeBuffer/pendingRequests": [],
+ "mergeBuffer/queries": [],
+ "mergeBuffer/used": [],
+ "metadata/kill/audit/count": [],
+ "metadata/kill/compaction/count": [],
+ "metadata/kill/datasource/count": [],
+ "metadata/kill/rule/count": [],
+ "metadata/kill/supervisor/count": [],
+ "metadatacache/backfill/count": [],
+ "metadatacache/init/time": [],
+ "metadatacache/refresh/count": [],
+ "metadatacache/refresh/time": [],
+ "metadatacache/schemaPoll/count": [],
+ "metadatacache/schemaPoll/failed": [],
+ "metadatacache/schemaPoll/time": [],
+ "query/bytes": [],
+ "query/cache/delta/averageBytes": [],
+ "query/cache/delta/errors": [],
+ "query/cache/delta/evictions": [],
+ "query/cache/delta/hitRate": [],
+ "query/cache/delta/hits": [],
+ "query/cache/delta/misses": [],
+ "query/cache/delta/numEntries": [],
+ "query/cache/delta/put/error": [],
+ "query/cache/delta/put/ok": [],
+ "query/cache/delta/put/oversized": [],
+ "query/cache/delta/sizeBytes": [],
+ "query/cache/delta/timeouts": [],
+ "query/cache/total/averageBytes": [],
+ "query/cache/total/errors": [],
+ "query/cache/total/evictions": [],
+ "query/cache/total/hitRate": [],
+ "query/cache/total/hits": [],
+ "query/cache/total/misses": [],
+ "query/cache/total/numEntries": [],
+ "query/cache/total/put/error": [],
+ "query/cache/total/put/ok": [],
+ "query/cache/total/put/oversized": [],
+ "query/cache/total/sizeBytes": [],
+ "query/cache/total/timeouts": [],
+ "query/count": [],
+ "query/cpu/time": [],
+ "query/failed/count": [],
+ "query/interrupted/count": [],
+ "query/node/bytes": [],
+ "query/node/time": [],
+ "query/node/ttfb": [],
+ "query/segment/time": [],
+ "query/segmentAndCache/time": [],
+ "query/success/count": [],
+ "query/time": [],
+ "query/timeout/count": [],
+ "query/wait/time": [],
+ "schemacache/finalizedSchemaPayload/count": [],
+ "schemacache/finalizedSegmentMetadata/count": [],
+ "schemacache/inTransitSMQPublishedResults/count": [],
+ "schemacache/inTransitSMQResults/count": [],
+ "schemacache/realtime/count": [],
+ "segment/added/bytes": [],
+ "segment/assignSkipped/count": [],
+ "segment/assigned/count": [],
+ "segment/availableDeepStorageOnly/count": [],
+ "segment/compacted/bytes": [],
+ "segment/compacted/count": [],
+ "segment/count": [],
+ "segment/deleted/count": [],
+ "segment/dropQueue/count": [],
+ "segment/dropSkipped/count": [],
+ "segment/dropped/count": [],
+ "segment/loadQueue/assigned": [],
+ "segment/loadQueue/cancelled": [],
+ "segment/loadQueue/count": [],
+ "segment/loadQueue/failed": [],
+ "segment/loadQueue/size": [],
+ "segment/loadQueue/success": [],
+ "segment/max": [],
+ "segment/moveSkipped/count": [],
+ "segment/moved/bytes": [],
+ "segment/moved/count": [],
+ "segment/nuked/bytes": [],
+ "segment/overShadowed/count": [],
+ "segment/pendingDelete": [],
+ "segment/scan/active": [],
+ "segment/scan/pending": [],
+ "segment/size": [],
+ "segment/skipCompact/bytes": [],
+ "segment/skipCompact/count": [],
+ "segment/unavailable/count": [],
+ "segment/underReplicated/count": [],
+ "segment/unneeded/count": [],
+ "segment/unneededEternityTombstone/count": [],
+ "segment/used": [],
+ "segment/usedPercent": [],
+ "segment/waitCompact/bytes": [],
+ "segment/waitCompact/count": [],
+ "serverview/init/time": [],
+ "serverview/sync/healthy": [],
+ "serverview/sync/unstableTime": [],
+ "service/heartbeat": [],
+ "sqlQuery/bytes": [],
+ "sqlQuery/planningTimeMs": [],
+ "sqlQuery/time": [],
+ "sys/cpu": [],
+ "sys/disk/queue": [],
+ "sys/disk/read/count": [],
+ "sys/disk/read/size": [],
+ "sys/disk/transferTime": [],
+ "sys/disk/write/count": [],
+ "sys/disk/write/size": [],
+ "sys/fs/files/count": [],
+ "sys/fs/files/free": [],
+ "sys/fs/max": [],
+ "sys/fs/used": [],
+ "sys/mem/free": [],
+ "sys/mem/max": [],
+ "sys/mem/used": [],
+ "sys/net/read/dropped": [],
+ "sys/net/read/errors": [],
+ "sys/net/read/packets": [],
+ "sys/net/read/size": [],
+ "sys/net/write/collisions": [],
+ "sys/net/write/errors": [],
+ "sys/net/write/packets": [],
+ "sys/net/write/size": [],
+ "sys/storage/used": [],
+ "sys/swap/free": [],
+ "sys/swap/max": [],
+ "sys/swap/pageIn": [],
+ "sys/swap/pageOut": [],
+ "sys/tcpv4/activeOpens": [],
+ "sys/tcpv4/attemptFails": [],
+ "sys/tcpv4/estabResets": [],
+ "sys/tcpv4/in/errs": [],
+ "sys/tcpv4/in/segs": [],
+ "sys/tcpv4/out/rsts": [],
+ "sys/tcpv4/out/segs": [],
+ "sys/tcpv4/passiveOpens": [],
+ "sys/tcpv4/retrans/segs": [],
+ "sys/uptime": [],
+ "task/action/batch/attempts": [],
+ "task/action/batch/queueTime": [],
+ "task/action/batch/runTime": [],
+ "task/action/batch/size": [],
+ "task/action/failed/count": [],
+ "task/action/run/time": [],
+ "task/action/success/count": [],
+ "task/autoScaler/requiredCount": [],
+ "task/failed/count": [],
+ "task/pending/count": [],
+ "task/pending/time": [],
+ "task/run/time": [],
+ "task/running/count": [],
+ "task/segmentAvailability/wait/time": [],
+ "task/success/count": [],
+ "task/waiting/count": [],
+ "tier/historical/count": [],
+ "tier/replication/factor": [],
+ "tier/required/capacity": [],
+ "tier/total/capacity": [],
+ "zk/connected": [],
+ "zk/reconnect/time": []
+}
\ No newline at end of file
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
index 9aa93d5d0f5..b7189699b61 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
@@ -38,6 +38,8 @@ public class LoggingEmitterConfigTest
);
Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(),
config.getLoggerClass());
Assert.assertEquals("getLogLevel", "info", config.getLogLevel());
+ Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics());
+ Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath());
}
@Test
@@ -52,6 +54,8 @@ public class LoggingEmitterConfigTest
Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(),
config.getLoggerClass());
Assert.assertEquals("getLogLevel", "debug", config.getLogLevel());
+ Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics());
+ Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath());
}
@Test
@@ -60,6 +64,8 @@ public class LoggingEmitterConfigTest
final Properties props = new Properties();
props.setProperty("org.apache.druid.java.util.emitter.loggerClass", "Foo");
props.setProperty("org.apache.druid.java.util.emitter.logLevel", "INFO");
+
props.setProperty("org.apache.druid.java.util.emitter.shouldFilterMetrics",
"true");
+ props.setProperty("org.apache.druid.java.util.emitter.allowedMetricsPath",
"/tmp/allowedMetrics.json");
final ObjectMapper objectMapper = new ObjectMapper();
final LoggingEmitterConfig config = objectMapper.convertValue(
@@ -69,6 +75,8 @@ public class LoggingEmitterConfigTest
Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass());
Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel());
+ Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics());
+ Assert.assertEquals("getAllowedMetricsPath", "/tmp/allowedMetrics.json",
config.getAllowedMetricsPath());
}
@Test
@@ -77,6 +85,8 @@ public class LoggingEmitterConfigTest
final Properties props = new Properties();
props.setProperty("org.apache.druid.java.util.emitter.logging.class",
"Foo");
props.setProperty("org.apache.druid.java.util.emitter.logging.level",
"INFO");
+
props.setProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics",
"true");
+
props.setProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath",
"/custom/path.json");
final ObjectMapper objectMapper = new ObjectMapper();
final LoggingEmitterConfig config = objectMapper.convertValue(
@@ -86,5 +96,7 @@ public class LoggingEmitterConfigTest
Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass());
Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel());
+ Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics());
+ Assert.assertEquals("getAllowedMetricsPath", "/custom/path.json",
config.getAllowedMetricsPath());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java
new file mode 100644
index 00000000000..b29cf44ef41
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.core;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.emitter.service.UnitEvent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LoggingEmitterTest
+{
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private List<Object> serializedObjects;
+ private ObjectMapper trackingMapper;
+ private LoggingEmitter emitter;
+
+ @Before
+ public void setUp()
+ {
+ serializedObjects = new ArrayList<>();
+ // A custom ObjectMapper that records every object passed to
writeValueAsString.
+ // This lets us detect which events actually reach the logging step (i.e.,
were NOT
+ // filtered out by the allowlist). We use Level.WARN because the WARN case
in emit()
+ // calls writeValueAsString unconditionally (no isWarnEnabled guard),
making it a
+ // reliable probe for whether an event passed the allowlist check.
+ trackingMapper = new ObjectMapper()
+ {
+ @Override
+ public String writeValueAsString(Object value) throws
JsonProcessingException
+ {
+ serializedObjects.add(value);
+ return super.writeValueAsString(value);
+ }
+ };
+ }
+
+ private LoggingEmitter createEmitter(boolean shouldFilterMetrics, String
allowedMetricsPath)
+ {
+ emitter = new LoggingEmitter(
+ new Logger(LoggingEmitter.class),
+ LoggingEmitter.Level.WARN,
+ trackingMapper,
+ shouldFilterMetrics,
+ allowedMetricsPath
+ );
+ emitter.start();
+ return emitter;
+ }
+
+ @After
+ public void tearDown()
+ {
+ if (emitter != null) {
+ emitter.close();
+ emitter = null;
+ }
+ }
+
+ /**
+ * Without filtering enabled, the emitter should log all events (backward
compatibility).
+ */
+ @Test
+ public void testEmitAllWhenFilteringDisabled()
+ {
+ createEmitter(false, null);
+
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/time",
100).build("test", "localhost"));
+ emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used",
512).build("test", "localhost"));
+ emitter.emit(ServiceMetricEvent.builder().setMetric("some/random/metric",
1).build("test", "localhost"));
+
+ Assert.assertEquals("All events should be serialized (logged)", 3,
serializedObjects.size());
+ }
+
+ /**
+ * With filtering enabled and no custom path, the default classpath resource
+ * (defaultMetrics.json) should be loaded. Metrics in the default list
+ * are emitted; unlisted metrics are dropped.
+ */
+ @Test
+ public void testFilterWithDefaultResource()
+ {
+ createEmitter(true, null);
+
+ // "query/time" is in the default allowed metrics list
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/time",
100).build("test", "localhost"));
+ // "some/unlisted/metric" is NOT in the default list
+
emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric",
1).build("test", "localhost"));
+
+ Assert.assertEquals("Only the allowed metric should be serialized", 1,
serializedObjects.size());
+ }
+
+ /**
+ * With filtering enabled and a custom file path, only metrics from that
file are emitted.
+ */
+ @Test
+ public void testFilterWithCustomFilePath() throws IOException
+ {
+ final File allowFile = createAllowlistFile("{\"query/time\": [],
\"query/bytes\": []}");
+ createEmitter(true, allowFile.getAbsolutePath());
+
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/time",
100).build("test", "localhost"));
+ emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used",
512).build("test", "localhost"));
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/bytes",
2048).build("test", "localhost"));
+
+ Assert.assertEquals("Only allowed metrics should be serialized", 2,
serializedObjects.size());
+ }
+
+ /**
+ * Non-metric events (like UnitEvent) should always pass through the filter,
+ * even when filtering is enabled.
+ */
+ @Test
+ public void testNonMetricEventsAlwaysPassThrough() throws IOException
+ {
+ final File allowFile = createAllowlistFile("{\"query/time\": []}");
+ createEmitter(true, allowFile.getAbsolutePath());
+
+ // This is NOT a ServiceMetricEvent, so it should bypass the allowlist
filter
+ emitter.emit(new UnitEvent("alerts", 42));
+
+ Assert.assertEquals("Non-metric events should bypass the allowlist
filter", 1, serializedObjects.size());
+ }
+
+ /**
+ * When a custom path is specified but the file is missing, the emitter
falls back
+ * to the default classpath resource and emits successfully.
+ */
+ @Test
+ public void testMissingCustomPathFallsBackToDefault()
+ {
+ createEmitter(true, "/nonexistent/path/to/allowedMetrics.json");
+
+ // Fallback to default should allow "query/time" (in default list) and
drop unlisted metrics
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/time",
100).build("test", "localhost"));
+
emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric",
1).build("test", "localhost"));
+
+ Assert.assertEquals("Fallback to default should allow listed metrics
only", 1, serializedObjects.size());
+ }
+
+ /**
+ * An empty allowlist should block all metric events but still pass
non-metric events.
+ */
+ @Test
+ public void testEmptyAllowlistBlocksAllMetrics() throws IOException
+ {
+ final File allowFile = createAllowlistFile("{}");
+ createEmitter(true, allowFile.getAbsolutePath());
+
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/time",
100).build("test", "localhost"));
+ emitter.emit(new UnitEvent("alerts", 42));
+
+ Assert.assertEquals("Only non-metric event should pass through", 1,
serializedObjects.size());
+ }
+
+ /**
+ * When shouldFilterMetrics is false, even if an allowedMetricsPath is
provided, filtering is not applied.
+ */
+ @Test
+ public void testFilterDisabledIgnoresPath() throws IOException
+ {
+ final File allowFile = createAllowlistFile("{\"query/time\": []}");
+ createEmitter(false, allowFile.getAbsolutePath());
+
+ emitter.emit(ServiceMetricEvent.builder().setMetric("query/time",
100).build("test", "localhost"));
+ emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used",
512).build("test", "localhost"));
+
+ Assert.assertEquals("All events should pass when filtering is disabled",
2, serializedObjects.size());
+ }
+
+ private File createAllowlistFile(String jsonContent) throws IOException
+ {
+ final File file = tempFolder.newFile("allowedMetrics.json");
+ try (Writer writer = new
OutputStreamWriter(Files.newOutputStream(file.toPath()),
StandardCharsets.UTF_8)) {
+ writer.write(jsonContent);
+ }
+ return file;
+ }
+}
diff --git a/processing/src/test/resources/defaultMetrics.json
b/processing/src/test/resources/defaultMetrics.json
new file mode 100644
index 00000000000..22a9d0329fc
--- /dev/null
+++ b/processing/src/test/resources/defaultMetrics.json
@@ -0,0 +1 @@
+{"compact/segmentAnalyzer/fetchAndProcessMillis":[],"compact/task/count":[],"compactTask/availableSlot/count":[],"compactTask/maxSlot/count":[],"coordinator/global/time":[],"coordinator/time":[],"groupBy/maxMergeDictionarySize":[],"groupBy/maxSpilledBytes":[],"groupBy/mergeDictionarySize":[],"groupBy/spilledBytes":[],"groupBy/spilledQueries":[],"ingest/count":[],"ingest/events/duplicate":[],"ingest/events/messageGap":[],"ingest/events/processed":[],"ingest/events/processedWithError":[],"
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]