This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8307d8a26c6 add defaultMetrics for logging emitter (#19030)
8307d8a26c6 is described below

commit 8307d8a26c6b7286a8a57b85940a7f29d3d43248
Author: sarangv <[email protected]>
AuthorDate: Sat Mar 7 12:37:21 2026 -0800

    add defaultMetrics for logging emitter (#19030)
    
    Fixes https://github.com/apache/druid/issues/19021.
    
    Operators can limit which metrics the logging emitter writes by setting 
druid.emitter.logging.shouldFilterMetrics=true and, if desired, 
druid.emitter.logging.allowedMetricsPath to a JSON object file (keys = metric 
names). Alerts and other non-metric events are always logged.
    
    
    Co-authored-by: Sarang Vadali <[email protected]>
---
 docs/configuration/index.md                        |   2 +
 .../druid/java/util/emitter/core/Emitters.java     |  10 +
 .../java/util/emitter/core/LoggingEmitter.java     | 103 ++++++++-
 .../util/emitter/core/LoggingEmitterConfig.java    |  34 +++
 processing/src/main/resources/defaultMetrics.json  | 249 +++++++++++++++++++++
 .../emitter/core/LoggingEmitterConfigTest.java     |  12 +
 .../java/util/emitter/core/LoggingEmitterTest.java | 213 ++++++++++++++++++
 processing/src/test/resources/defaultMetrics.json  |   1 +
 8 files changed, 623 insertions(+), 1 deletion(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c370ed6e1c5..65fcd39d7e6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -2023,6 +2023,8 @@ log4j config to route these logs to different sources 
based on the feed of the e
 |--------|-----------|--------|
 |`druid.emitter.logging.loggerClass`|The class used for 
logging.|`org.apache.druid.java.util.emitter.core.LoggingEmitter`|
 |`druid.emitter.logging.logLevel`|Choices: debug, info, warn, error. The log 
level at which message are logged.|info|
+|`druid.emitter.logging.shouldFilterMetrics`|When true, only metrics listed in 
the allow list are emitted; non-metric events (e.g. alerts) are always emitted. 
When false, all events are logged (backward-compatible).|false|
+|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are 
the allowed metric names. Only used when `shouldFilterMetrics` is true. If null 
or empty, the bundled classpath resource `defaultMetrics.json` is used. If a 
path is set but the file is missing, a warning is logged and the emitter falls 
back to the default classpath resource.|null|
 
 #### HTTP emitter module
 
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
 
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
index 00d424b88fc..217b2483174 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java
@@ -140,6 +140,16 @@ public class Emitters
     loggingMap.put(
         "logLevel", 
props.getProperty("org.apache.druid.java.util.emitter.logging.level", "debug")
     );
+    if 
(props.containsKey("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics"))
 {
+      loggingMap.put(
+          "shouldFilterMetrics", 
Boolean.parseBoolean(props.getProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics"))
+      );
+    }
+    if 
(props.containsKey("org.apache.druid.java.util.emitter.logging.allowedMetricsPath"))
 {
+      loggingMap.put(
+          "allowedMetricsPath", 
props.getProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath")
+      );
+    }
     return loggingMap;
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
 
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
index afc8e07e2c2..17392515dd6 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java
@@ -22,13 +22,26 @@ package org.apache.druid.java.util.emitter.core;
 /**
  */
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.slf4j.MarkerFactory;
 
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -36,22 +49,100 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class LoggingEmitter implements Emitter
 {
+  private static final Logger LOGGER = new Logger(LoggingEmitter.class);
+  private static final String DEFAULT_ALLOWED_METRICS_RESOURCE = 
"defaultMetrics.json";
+
   private final Logger log;
   private final Level level;
   private final ObjectMapper jsonMapper;
+  @Nullable
+  private final Set<String> allowedMetrics;
 
   private final AtomicBoolean started = new AtomicBoolean(false);
 
   public LoggingEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper)
   {
-    this(new Logger(config.getLoggerClass()), 
Level.toLevel(config.getLogLevel()), jsonMapper);
+    this(
+        new Logger(config.getLoggerClass()),
+        Level.toLevel(config.getLogLevel()),
+        jsonMapper,
+        config.shouldFilterMetrics(),
+        config.getAllowedMetricsPath()
+    );
   }
 
   public LoggingEmitter(Logger log, Level level, ObjectMapper jsonMapper)
+  {
+    this(log, level, jsonMapper, false, null);
+  }
+
+  public LoggingEmitter(
+      Logger log,
+      Level level,
+      ObjectMapper jsonMapper,
+      boolean shouldFilterMetrics,
+      @Nullable String allowedMetricsPath
+  )
   {
     this.log = log;
     this.level = level;
     this.jsonMapper = jsonMapper;
+    this.allowedMetrics = shouldFilterMetrics ? 
loadAllowedMetrics(allowedMetricsPath, jsonMapper) : null;
+  }
+
+  /**
+   * Loads the allowed metric names from a JSON file. If the path is null or 
empty,
+   * loads from the bundled classpath resource (defaultMetrics.json). If a 
custom
+   * path is provided but the file is missing, logs a warning and falls back to
+   * the default classpath resource.
+   */
+  private static Set<String> loadAllowedMetrics(@Nullable String path, 
ObjectMapper jsonMapper)
+  {
+    final InputStream is = openAllowedMetricsStream(path);
+    try {
+      final Map<String, Object> metricsMap = jsonMapper.readValue(is, new 
TypeReference<Map<String, Object>>() {});
+      return Collections.unmodifiableSet(metricsMap.keySet());
+    }
+    catch (IOException e) {
+      final String source = path == null || Strings.isNullOrEmpty(path) ? 
DEFAULT_ALLOWED_METRICS_RESOURCE : path;
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(e, "Allowed metrics file must be a JSON 
object with metric names as keys; failed to parse [%s]", source);
+    }
+  }
+
+  /**
+   * Opens the allowed metrics configuration stream. Uses classpath resource 
when
+   * path is null/empty. When a custom path is specified but the file is 
missing,
+   * logs a warning and falls back to the default classpath resource.
+   */
+  private static InputStream openAllowedMetricsStream(@Nullable String path)
+  {
+    if (Strings.isNullOrEmpty(path)) {
+      LOGGER.info("Using default allowed metrics configuration from classpath 
resource [%s]", DEFAULT_ALLOWED_METRICS_RESOURCE);
+      return openDefaultAllowedMetricsResource();
+    }
+    try {
+      final InputStream is = new FileInputStream(new File(path));
+      LOGGER.info("Using allowed metrics configuration at [%s]", path);
+      return is;
+    }
+    catch (FileNotFoundException e) {
+      LOGGER.warn(e, "Allowed metrics file [%s] not found, falling back to 
default classpath resource [%s]",
+                  path, DEFAULT_ALLOWED_METRICS_RESOURCE);
+      return openDefaultAllowedMetricsResource();
+    }
+  }
+
+  private static InputStream openDefaultAllowedMetricsResource()
+  {
+    final InputStream is = 
LoggingEmitter.class.getClassLoader().getResourceAsStream(DEFAULT_ALLOWED_METRICS_RESOURCE);
+    if (is == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build("Could not find default allowed metrics 
resource [%s] on classpath", DEFAULT_ALLOWED_METRICS_RESOURCE);
+    }
+    return is;
   }
 
   @Override
@@ -95,6 +186,16 @@ public class LoggingEmitter implements Emitter
         throw new RejectedExecutionException("Service not started.");
       }
     }
+
+    // Allowlist filtering: only applies to ServiceMetricEvents.
+    // Non-metric events (alerts, etc.) always pass through.
+    if (allowedMetrics != null && event instanceof ServiceMetricEvent) {
+      final String metricName = ((ServiceMetricEvent) event).getMetric();
+      if (!allowedMetrics.contains(metricName)) {
+        return;
+      }
+    }
+
     try {
       switch (level) {
         case TRACE:
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
 
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
index 39b85126f77..7aa3d579749 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.java.util.emitter.core;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
 /**
@@ -35,6 +36,26 @@ public class LoggingEmitterConfig
   @JsonProperty
   private String logLevel = "info";
 
+  /**
+   * When true, only metrics listed in the allowed metrics configuration are 
emitted.
+   * If {@link #allowedMetricsPath} is null/empty, the bundled default 
allowlist
+   * (defaultMetrics.json on the classpath) is used. If a path is provided,
+   * it is loaded from that file instead.
+   * Defaults to false (emit all metrics, backward-compatible behavior).
+   */
+  @JsonProperty("shouldFilterMetrics")
+  private boolean shouldFilterMetrics = false;
+
+  /**
+   * Optional path to a JSON file containing an array of allowed metric names.
+   * Only used when {@link #shouldFilterMetrics} is true.
+   * If null or empty, the bundled default resource (defaultMetrics.json) is 
loaded
+   * from the classpath, mirroring how the Prometheus emitter loads its 
defaultMetrics.json.
+   */
+  @JsonProperty
+  @Nullable
+  private String allowedMetricsPath = null;
+
   public String getLoggerClass()
   {
     return loggerClass;
@@ -45,12 +66,25 @@ public class LoggingEmitterConfig
     return logLevel;
   }
 
+  public boolean shouldFilterMetrics()
+  {
+    return shouldFilterMetrics;
+  }
+
+  @Nullable
+  public String getAllowedMetricsPath()
+  {
+    return allowedMetricsPath;
+  }
+
   @Override
   public String toString()
   {
     return "LoggingEmitterConfig{" +
            "loggerClass='" + loggerClass + '\'' +
            ", logLevel='" + logLevel + '\'' +
+           ", shouldFilterMetrics=" + shouldFilterMetrics +
+           ", allowedMetricsPath='" + allowedMetricsPath + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/resources/defaultMetrics.json 
b/processing/src/main/resources/defaultMetrics.json
new file mode 100644
index 00000000000..ea6c2fe6b5f
--- /dev/null
+++ b/processing/src/main/resources/defaultMetrics.json
@@ -0,0 +1,249 @@
+{
+    "compact/segmentAnalyzer/fetchAndProcessMillis": [],
+    "compact/task/count": [],
+    "compactTask/availableSlot/count": [],
+    "compactTask/maxSlot/count": [],
+    "coordinator/global/time": [],
+    "coordinator/time": [],
+    "groupBy/maxMergeDictionarySize": [],
+    "groupBy/maxSpilledBytes": [],
+    "groupBy/mergeDictionarySize": [],
+    "groupBy/spilledBytes": [],
+    "groupBy/spilledQueries": [],
+    "ingest/count": [],
+    "ingest/events/duplicate": [],
+    "ingest/events/messageGap": [],
+    "ingest/events/processed": [],
+    "ingest/events/processedWithError": [],
+    "ingest/events/thrownAway": [],
+    "ingest/events/unparseable": [],
+    "ingest/handoff/count": [],
+    "ingest/handoff/failed": [],
+    "ingest/handoff/time": [],
+    "ingest/input/bytes": [],
+    "ingest/kafka/avgLag": [],
+    "ingest/kafka/lag": [],
+    "ingest/kafka/maxLag": [],
+    "ingest/kafka/partitionLag": [],
+    "ingest/merge/cpu": [],
+    "ingest/merge/time": [],
+    "ingest/notices/queueSize": [],
+    "ingest/notices/time": [],
+    "ingest/pause/time": [],
+    "ingest/persists/backPressure": [],
+    "ingest/persists/count": [],
+    "ingest/persists/cpu": [],
+    "ingest/persists/failed": [],
+    "ingest/persists/time": [],
+    "ingest/rows/output": [],
+    "ingest/segments/count": [],
+    "ingest/sink/count": [],
+    "ingest/tombstones/count": [],
+    "interval/compacted/count": [],
+    "interval/skipCompact/count": [],
+    "interval/waitCompact/count": [],
+    "jetty/numOpenConnections": [],
+    "jetty/threadPool/busy": [],
+    "jetty/threadPool/idle": [],
+    "jetty/threadPool/isLowOnThreads": [],
+    "jetty/threadPool/max": [],
+    "jetty/threadPool/min": [],
+    "jetty/threadPool/queueSize": [],
+    "jetty/threadPool/ready": [],
+    "jetty/threadPool/total": [],
+    "jetty/threadPool/utilizationRate": [],
+    "jetty/threadPool/utilized": [],
+    "jvm/bufferpool/capacity": [],
+    "jvm/bufferpool/count": [],
+    "jvm/bufferpool/used": [],
+    "jvm/gc/count": [],
+    "jvm/gc/cpu": [],
+    "jvm/mem/committed": [],
+    "jvm/mem/init": [],
+    "jvm/mem/max": [],
+    "jvm/mem/used": [],
+    "jvm/pool/committed": [],
+    "jvm/pool/init": [],
+    "jvm/pool/max": [],
+    "jvm/pool/used": [],
+    "kafka/consumer/bytesConsumed": [],
+    "kafka/consumer/fetch": [],
+    "kafka/consumer/fetchLatencyAvg": [],
+    "kafka/consumer/fetchLatencyMax": [],
+    "kafka/consumer/fetchRate": [],
+    "kafka/consumer/fetchSizeAvg": [],
+    "kafka/consumer/fetchSizeMax": [],
+    "kafka/consumer/incomingBytes": [],
+    "kafka/consumer/outgoingBytes": [],
+    "kafka/consumer/recordsConsumed": [],
+    "kafka/consumer/recordsLag": [],
+    "kafka/consumer/recordsPerRequestAvg": [],
+    "kill/eligibleUnusedSegments/count": [],
+    "kill/pendingSegments/count": [],
+    "kill/task/count": [],
+    "killTask/availableSlot/count": [],
+    "killTask/maxSlot/count": [],
+    "mergeBuffer/acquisitionTimeNs": [],
+    "mergeBuffer/maxAcquisitionTimeNs": [],
+    "mergeBuffer/pendingRequests": [],
+    "mergeBuffer/queries": [],
+    "mergeBuffer/used": [],
+    "metadata/kill/audit/count": [],
+    "metadata/kill/compaction/count": [],
+    "metadata/kill/datasource/count": [],
+    "metadata/kill/rule/count": [],
+    "metadata/kill/supervisor/count": [],
+    "metadatacache/backfill/count": [],
+    "metadatacache/init/time": [],
+    "metadatacache/refresh/count": [],
+    "metadatacache/refresh/time": [],
+    "metadatacache/schemaPoll/count": [],
+    "metadatacache/schemaPoll/failed": [],
+    "metadatacache/schemaPoll/time": [],
+    "query/bytes": [],
+    "query/cache/delta/averageBytes": [],
+    "query/cache/delta/errors": [],
+    "query/cache/delta/evictions": [],
+    "query/cache/delta/hitRate": [],
+    "query/cache/delta/hits": [],
+    "query/cache/delta/misses": [],
+    "query/cache/delta/numEntries": [],
+    "query/cache/delta/put/error": [],
+    "query/cache/delta/put/ok": [],
+    "query/cache/delta/put/oversized": [],
+    "query/cache/delta/sizeBytes": [],
+    "query/cache/delta/timeouts": [],
+    "query/cache/total/averageBytes": [],
+    "query/cache/total/errors": [],
+    "query/cache/total/evictions": [],
+    "query/cache/total/hitRate": [],
+    "query/cache/total/hits": [],
+    "query/cache/total/misses": [],
+    "query/cache/total/numEntries": [],
+    "query/cache/total/put/error": [],
+    "query/cache/total/put/ok": [],
+    "query/cache/total/put/oversized": [],
+    "query/cache/total/sizeBytes": [],
+    "query/cache/total/timeouts": [],
+    "query/count": [],
+    "query/cpu/time": [],
+    "query/failed/count": [],
+    "query/interrupted/count": [],
+    "query/node/bytes": [],
+    "query/node/time": [],
+    "query/node/ttfb": [],
+    "query/segment/time": [],
+    "query/segmentAndCache/time": [],
+    "query/success/count": [],
+    "query/time": [],
+    "query/timeout/count": [],
+    "query/wait/time": [],
+    "schemacache/finalizedSchemaPayload/count": [],
+    "schemacache/finalizedSegmentMetadata/count": [],
+    "schemacache/inTransitSMQPublishedResults/count": [],
+    "schemacache/inTransitSMQResults/count": [],
+    "schemacache/realtime/count": [],
+    "segment/added/bytes": [],
+    "segment/assignSkipped/count": [],
+    "segment/assigned/count": [],
+    "segment/availableDeepStorageOnly/count": [],
+    "segment/compacted/bytes": [],
+    "segment/compacted/count": [],
+    "segment/count": [],
+    "segment/deleted/count": [],
+    "segment/dropQueue/count": [],
+    "segment/dropSkipped/count": [],
+    "segment/dropped/count": [],
+    "segment/loadQueue/assigned": [],
+    "segment/loadQueue/cancelled": [],
+    "segment/loadQueue/count": [],
+    "segment/loadQueue/failed": [],
+    "segment/loadQueue/size": [],
+    "segment/loadQueue/success": [],
+    "segment/max": [],
+    "segment/moveSkipped/count": [],
+    "segment/moved/bytes": [],
+    "segment/moved/count": [],
+    "segment/nuked/bytes": [],
+    "segment/overShadowed/count": [],
+    "segment/pendingDelete": [],
+    "segment/scan/active": [],
+    "segment/scan/pending": [],
+    "segment/size": [],
+    "segment/skipCompact/bytes": [],
+    "segment/skipCompact/count": [],
+    "segment/unavailable/count": [],
+    "segment/underReplicated/count": [],
+    "segment/unneeded/count": [],
+    "segment/unneededEternityTombstone/count": [],
+    "segment/used": [],
+    "segment/usedPercent": [],
+    "segment/waitCompact/bytes": [],
+    "segment/waitCompact/count": [],
+    "serverview/init/time": [],
+    "serverview/sync/healthy": [],
+    "serverview/sync/unstableTime": [],
+    "service/heartbeat": [],
+    "sqlQuery/bytes": [],
+    "sqlQuery/planningTimeMs": [],
+    "sqlQuery/time": [],
+    "sys/cpu": [],
+    "sys/disk/queue": [],
+    "sys/disk/read/count": [],
+    "sys/disk/read/size": [],
+    "sys/disk/transferTime": [],
+    "sys/disk/write/count": [],
+    "sys/disk/write/size": [],
+    "sys/fs/files/count": [],
+    "sys/fs/files/free": [],
+    "sys/fs/max": [],
+    "sys/fs/used": [],
+    "sys/mem/free": [],
+    "sys/mem/max": [],
+    "sys/mem/used": [],
+    "sys/net/read/dropped": [],
+    "sys/net/read/errors": [],
+    "sys/net/read/packets": [],
+    "sys/net/read/size": [],
+    "sys/net/write/collisions": [],
+    "sys/net/write/errors": [],
+    "sys/net/write/packets": [],
+    "sys/net/write/size": [],
+    "sys/storage/used": [],
+    "sys/swap/free": [],
+    "sys/swap/max": [],
+    "sys/swap/pageIn": [],
+    "sys/swap/pageOut": [],
+    "sys/tcpv4/activeOpens": [],
+    "sys/tcpv4/attemptFails": [],
+    "sys/tcpv4/estabResets": [],
+    "sys/tcpv4/in/errs": [],
+    "sys/tcpv4/in/segs": [],
+    "sys/tcpv4/out/rsts": [],
+    "sys/tcpv4/out/segs": [],
+    "sys/tcpv4/passiveOpens": [],
+    "sys/tcpv4/retrans/segs": [],
+    "sys/uptime": [],
+    "task/action/batch/attempts": [],
+    "task/action/batch/queueTime": [],
+    "task/action/batch/runTime": [],
+    "task/action/batch/size": [],
+    "task/action/failed/count": [],
+    "task/action/run/time": [],
+    "task/action/success/count": [],
+    "task/autoScaler/requiredCount": [],
+    "task/failed/count": [],
+    "task/pending/count": [],
+    "task/pending/time": [],
+    "task/run/time": [],
+    "task/running/count": [],
+    "task/segmentAvailability/wait/time": [],
+    "task/success/count": [],
+    "task/waiting/count": [],
+    "tier/historical/count": [],
+    "tier/replication/factor": [],
+    "tier/required/capacity": [],
+    "tier/total/capacity": [],
+    "zk/connected": [],
+    "zk/reconnect/time": []
+}
\ No newline at end of file
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
index 9aa93d5d0f5..b7189699b61 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java
@@ -38,6 +38,8 @@ public class LoggingEmitterConfigTest
     );
     Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(), 
config.getLoggerClass());
     Assert.assertEquals("getLogLevel", "info", config.getLogLevel());
+    Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics());
+    Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath());
   }
 
   @Test
@@ -52,6 +54,8 @@ public class LoggingEmitterConfigTest
 
     Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(), 
config.getLoggerClass());
     Assert.assertEquals("getLogLevel", "debug", config.getLogLevel());
+    Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics());
+    Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath());
   }
 
   @Test
@@ -60,6 +64,8 @@ public class LoggingEmitterConfigTest
     final Properties props = new Properties();
     props.setProperty("org.apache.druid.java.util.emitter.loggerClass", "Foo");
     props.setProperty("org.apache.druid.java.util.emitter.logLevel", "INFO");
+    
props.setProperty("org.apache.druid.java.util.emitter.shouldFilterMetrics", 
"true");
+    props.setProperty("org.apache.druid.java.util.emitter.allowedMetricsPath", 
"/tmp/allowedMetrics.json");
 
     final ObjectMapper objectMapper = new ObjectMapper();
     final LoggingEmitterConfig config = objectMapper.convertValue(
@@ -69,6 +75,8 @@ public class LoggingEmitterConfigTest
 
     Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass());
     Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel());
+    Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics());
+    Assert.assertEquals("getAllowedMetricsPath", "/tmp/allowedMetrics.json", 
config.getAllowedMetricsPath());
   }
 
   @Test
@@ -77,6 +85,8 @@ public class LoggingEmitterConfigTest
     final Properties props = new Properties();
     props.setProperty("org.apache.druid.java.util.emitter.logging.class", 
"Foo");
     props.setProperty("org.apache.druid.java.util.emitter.logging.level", 
"INFO");
+    
props.setProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics",
 "true");
+    
props.setProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath",
 "/custom/path.json");
 
     final ObjectMapper objectMapper = new ObjectMapper();
     final LoggingEmitterConfig config = objectMapper.convertValue(
@@ -86,5 +96,7 @@ public class LoggingEmitterConfigTest
 
     Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass());
     Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel());
+    Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics());
+    Assert.assertEquals("getAllowedMetricsPath", "/custom/path.json", 
config.getAllowedMetricsPath());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java
new file mode 100644
index 00000000000..b29cf44ef41
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.core;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.emitter.service.UnitEvent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LoggingEmitterTest
+{
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private List<Object> serializedObjects;
+  private ObjectMapper trackingMapper;
+  private LoggingEmitter emitter;
+
+  @Before
+  public void setUp()
+  {
+    serializedObjects = new ArrayList<>();
+    // A custom ObjectMapper that records every object passed to 
writeValueAsString.
+    // This lets us detect which events actually reach the logging step (i.e., 
were NOT
+    // filtered out by the allowlist). We use Level.WARN because the WARN case 
in emit()
+    // calls writeValueAsString unconditionally (no isWarnEnabled guard), 
making it a
+    // reliable probe for whether an event passed the allowlist check.
+    trackingMapper = new ObjectMapper()
+    {
+      @Override
+      public String writeValueAsString(Object value) throws 
JsonProcessingException
+      {
+        serializedObjects.add(value);
+        return super.writeValueAsString(value);
+      }
+    };
+  }
+
+  private LoggingEmitter createEmitter(boolean shouldFilterMetrics, String 
allowedMetricsPath)
+  {
+    emitter = new LoggingEmitter(
+        new Logger(LoggingEmitter.class),
+        LoggingEmitter.Level.WARN,
+        trackingMapper,
+        shouldFilterMetrics,
+        allowedMetricsPath
+    );
+    emitter.start();
+    return emitter;
+  }
+
+  @After
+  public void tearDown()
+  {
+    if (emitter != null) {
+      emitter.close();
+      emitter = null;
+    }
+  }
+
+  /**
+   * Without filtering enabled, the emitter should log all events (backward 
compatibility).
+   */
+  @Test
+  public void testEmitAllWhenFilteringDisabled()
+  {
+    createEmitter(false, null);
+
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 
100).build("test", "localhost"));
+    emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 
512).build("test", "localhost"));
+    emitter.emit(ServiceMetricEvent.builder().setMetric("some/random/metric", 
1).build("test", "localhost"));
+
+    Assert.assertEquals("All events should be serialized (logged)", 3, 
serializedObjects.size());
+  }
+
+  /**
+   * With filtering enabled and no custom path, the default classpath resource
+   * (defaultMetrics.json) should be loaded. Metrics in the default list
+   * are emitted; unlisted metrics are dropped.
+   */
+  @Test
+  public void testFilterWithDefaultResource()
+  {
+    createEmitter(true, null);
+
+    // "query/time" is in the default allowed metrics list
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 
100).build("test", "localhost"));
+    // "some/unlisted/metric" is NOT in the default list
+    
emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 
1).build("test", "localhost"));
+
+    Assert.assertEquals("Only the allowed metric should be serialized", 1, 
serializedObjects.size());
+  }
+
+  /**
+   * With filtering enabled and a custom file path, only metrics from that 
file are emitted.
+   */
+  @Test
+  public void testFilterWithCustomFilePath() throws IOException
+  {
+    final File allowFile = createAllowlistFile("{\"query/time\": [], 
\"query/bytes\": []}");
+    createEmitter(true, allowFile.getAbsolutePath());
+
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 
100).build("test", "localhost"));
+    emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 
512).build("test", "localhost"));
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/bytes", 
2048).build("test", "localhost"));
+
+    Assert.assertEquals("Only allowed metrics should be serialized", 2, 
serializedObjects.size());
+  }
+
+  /**
+   * Non-metric events (like UnitEvent) should always pass through the filter,
+   * even when filtering is enabled.
+   */
+  @Test
+  public void testNonMetricEventsAlwaysPassThrough() throws IOException
+  {
+    final File allowFile = createAllowlistFile("{\"query/time\": []}");
+    createEmitter(true, allowFile.getAbsolutePath());
+
+    // This is NOT a ServiceMetricEvent, so it should bypass the allowlist 
filter
+    emitter.emit(new UnitEvent("alerts", 42));
+
+    Assert.assertEquals("Non-metric events should bypass the allowlist 
filter", 1, serializedObjects.size());
+  }
+
+  /**
+   * When a custom path is specified but the file is missing, the emitter 
falls back
+   * to the default classpath resource and emits successfully.
+   */
+  @Test
+  public void testMissingCustomPathFallsBackToDefault()
+  {
+    createEmitter(true, "/nonexistent/path/to/allowedMetrics.json");
+
+    // Fallback to default should allow "query/time" (in default list) and 
drop unlisted metrics
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 
100).build("test", "localhost"));
+    
emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 
1).build("test", "localhost"));
+
+    Assert.assertEquals("Fallback to default should allow listed metrics 
only", 1, serializedObjects.size());
+  }
+
+  /**
+   * An empty allowlist should block all metric events but still pass 
non-metric events.
+   */
+  @Test
+  public void testEmptyAllowlistBlocksAllMetrics() throws IOException
+  {
+    final File allowFile = createAllowlistFile("{}");
+    createEmitter(true, allowFile.getAbsolutePath());
+
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 
100).build("test", "localhost"));
+    emitter.emit(new UnitEvent("alerts", 42));
+
+    Assert.assertEquals("Only non-metric event should pass through", 1, 
serializedObjects.size());
+  }
+
+  /**
+   * When shouldFilterMetrics is false, even if an allowedMetricsPath is 
provided, filtering is not applied.
+   */
+  @Test
+  public void testFilterDisabledIgnoresPath() throws IOException
+  {
+    final File allowFile = createAllowlistFile("{\"query/time\": []}");
+    createEmitter(false, allowFile.getAbsolutePath());
+
+    emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 
100).build("test", "localhost"));
+    emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 
512).build("test", "localhost"));
+
+    Assert.assertEquals("All events should pass when filtering is disabled", 
2, serializedObjects.size());
+  }
+
+  private File createAllowlistFile(String jsonContent) throws IOException
+  {
+    final File file = tempFolder.newFile("allowedMetrics.json");
+    try (Writer writer = new 
OutputStreamWriter(Files.newOutputStream(file.toPath()), 
StandardCharsets.UTF_8)) {
+      writer.write(jsonContent);
+    }
+    return file;
+  }
+}
diff --git a/processing/src/test/resources/defaultMetrics.json 
b/processing/src/test/resources/defaultMetrics.json
new file mode 100644
index 00000000000..22a9d0329fc
--- /dev/null
+++ b/processing/src/test/resources/defaultMetrics.json
@@ -0,0 +1 @@
+{"compact/segmentAnalyzer/fetchAndProcessMillis":[],"compact/task/count":[],"compactTask/availableSlot/count":[],"compactTask/maxSlot/count":[],"coordinator/global/time":[],"coordinator/time":[],"groupBy/maxMergeDictionarySize":[],"groupBy/maxSpilledBytes":[],"groupBy/mergeDictionarySize":[],"groupBy/spilledBytes":[],"groupBy/spilledQueries":[],"ingest/count":[],"ingest/events/duplicate":[],"ingest/events/messageGap":[],"ingest/events/processed":[],"ingest/events/processedWithError":[],"
 [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to