This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_10x by this push:
     new 4f9406a23c3 SOLR-18060: Add Prometheus metrics to CrossDC Consumer. 
(cherry-pick from #4063) (#4102)
4f9406a23c3 is described below

commit 4f9406a23c316ebdb046b2506c0c764ed5fae151
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Wed Feb 4 15:50:41 2026 +0100

    SOLR-18060: Add Prometheus metrics to CrossDC Consumer. (cherry-pick from 
#4063) (#4102)
---
 changelog/unreleased/solr-18060.yml                |   9 +
 .../apache/solr/handler/admin/MetricsHandler.java  |   2 +-
 .../solr/response/PrometheusResponseWriter.java    |   4 +-
 solr/cross-dc-manager/build.gradle                 |   9 +-
 solr/cross-dc-manager/gradle.lockfile              |  52 +++-
 .../solr/crossdc/manager/consumer/Consumer.java    |  23 +-
 .../crossdc/manager/consumer/ConsumerMetrics.java  | 171 ++++++++++
 .../manager/consumer/KafkaCrossDcConsumer.java     |  49 +--
 .../crossdc/manager/consumer/MetricsServlet.java   |  69 +++++
 .../solr/crossdc/manager/consumer/OtelMetrics.java | 182 +++++++++++
 .../solr/crossdc/manager/consumer/ThreadDump.java  | 142 +++++++++
 .../manager/consumer/ThreadDumpServlet.java        |  76 +++++
 .../apache/solr/crossdc/manager/consumer/Util.java |  55 ++--
 .../messageprocessor/SolrMessageProcessor.java     |  40 +--
 .../crossdc/manager/DeleteByQueryToIdTest.java     |   8 +-
 .../crossdc/manager/SimpleSolrIntegrationTest.java |   4 +-
 .../manager/SolrAndKafkaIntegrationTest.java       |  46 ++-
 .../manager/consumer/KafkaCrossDcConsumerTest.java |   8 +-
 .../messageprocessor/SolrMessageProcessorTest.java |   4 +-
 .../messageprocessor/TestMessageProcessor.java     |   5 +-
 solr/licenses/metrics-healthchecks-4.2.26.jar.sha1 |   1 -
 .../metrics-jakarta-servlets-4.2.26.jar.sha1       |   1 -
 solr/licenses/metrics-json-4.2.26.jar.sha1         |   1 -
 solr/licenses/metrics-jvm-4.2.26.jar.sha1          |   1 -
 solr/licenses/metrics-jvm-LICENSE-ASL.txt          | 203 ------------
 solr/licenses/metrics-jvm-NOTICE.txt               |  12 -
 ...tcnative-boringssl-static-2.0.73.Final.jar.sha1 |   1 +
 .../netty-tcnative-classes-2.0.73.Final.jar.sha1   |   1 +
 solr/licenses/profiler-1.1.1.jar.sha1              |   1 -
 solr/licenses/profiler-LICENSE-ASL.txt             | 345 ---------------------
 solr/licenses/profiler-NOTICE.txt                  |   0
 .../processor/KafkaRequestMirroringHandler.java    |   4 +-
 32 files changed, 845 insertions(+), 684 deletions(-)

diff --git a/changelog/unreleased/solr-18060.yml 
b/changelog/unreleased/solr-18060.yml
new file mode 100644
index 00000000000..773805a8bbf
--- /dev/null
+++ b/changelog/unreleased/solr-18060.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: CrossDC Consumer - add Prometheus metrics
+type: added
+authors:
+  - name: Andrzej Bialecki
+    nick: ab
+links:
+  - name: SOLR-18060
+    url: https://issues.apache.org/jira/browse/SOLR-18060
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/MetricsHandler.java 
b/solr/core/src/java/org/apache/solr/handler/admin/MetricsHandler.java
index 13b4d044c7e..a0ab70e07fd 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/MetricsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/MetricsHandler.java
@@ -133,7 +133,7 @@ public class MetricsHandler extends RequestHandlerBase 
implements PermissionName
     }
   }
 
-  private void handleRequest(SolrParams params, BiConsumer<String, Object> 
consumer) {
+  public void handleRequest(SolrParams params, BiConsumer<String, Object> 
consumer) {
     if (!enabled) {
       consumer.accept("error", "metrics collection is disabled");
       return;
diff --git 
a/solr/core/src/java/org/apache/solr/response/PrometheusResponseWriter.java 
b/solr/core/src/java/org/apache/solr/response/PrometheusResponseWriter.java
index 01a2af19421..610fbaa8df2 100644
--- a/solr/core/src/java/org/apache/solr/response/PrometheusResponseWriter.java
+++ b/solr/core/src/java/org/apache/solr/response/PrometheusResponseWriter.java
@@ -34,8 +34,8 @@ import org.apache.solr.request.SolrQueryRequest;
 public class PrometheusResponseWriter implements QueryResponseWriter {
   // not TextQueryResponseWriter because Prometheus libs work with an 
OutputStream
 
-  private static final String CONTENT_TYPE_PROMETHEUS = "text/plain; 
version=0.0.4";
-  private static final String CONTENT_TYPE_OPEN_METRICS =
+  public static final String CONTENT_TYPE_PROMETHEUS = "text/plain; 
version=0.0.4";
+  public static final String CONTENT_TYPE_OPEN_METRICS =
       "application/openmetrics-text; version=1.0.0; charset=utf-8";
 
   @Override
diff --git a/solr/cross-dc-manager/build.gradle 
b/solr/cross-dc-manager/build.gradle
index 6d128614612..4ce538f67f7 100644
--- a/solr/cross-dc-manager/build.gradle
+++ b/solr/cross-dc-manager/build.gradle
@@ -24,14 +24,18 @@ description = 'Cross-DC Manager'
 
 dependencies {
   implementation platform(project(':platform'))
+  implementation project(':solr:core')
   implementation project(':solr:solrj')
   implementation project(':solr:solrj-zookeeper')
   implementation project(':solr:modules:cross-dc')
+  implementation project(':solr:modules:opentelemetry')
 
-  implementation libs.dropwizard.metrics.core
-  implementation libs.dropwizard.metrics.servlets
+  implementation platform(libs.opentelemetry.bom)
+  implementation libs.opentelemetry.api
+  implementation libs.opentelemetry.sdk.metrics
   implementation libs.eclipse.jetty.server
   implementation libs.eclipse.jetty.ee10.servlet
+  implementation libs.jakarta.servlet.api
   implementation libs.slf4j.api
   runtimeOnly libs.google.protobuf.javautils
   runtimeOnly libs.commonscodec.commonscodec
@@ -49,6 +53,7 @@ dependencies {
   testImplementation project(':solr:solrj-jetty')
   testImplementation libs.apache.lucene.testframework
   testImplementation libs.carrotsearch.randomizedtesting.runner
+  testImplementation libs.commonsio.commonsio
   testImplementation libs.junit.junit
   // The explicit dependency on bytebuddy is required for Java 25 support
   // Once Mockito upgrades its dependency on ByteBuddy to 1.16.1, we should
diff --git a/solr/cross-dc-manager/gradle.lockfile 
b/solr/cross-dc-manager/gradle.lockfile
index 6d9a1f75569..ca8999d0c32 100644
--- a/solr/cross-dc-manager/gradle.lockfile
+++ b/solr/cross-dc-manager/gradle.lockfile
@@ -4,8 +4,8 @@
 
com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.8.3=jarValidation,testCompileClasspath,testRuntimeClasspath
 
com.carrotsearch:hppc:0.10.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
com.fasterxml.jackson.core:jackson-annotations:2.20=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson.core:jackson-core:2.20.1=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson.core:jackson-databind:2.20.1=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+com.fasterxml.jackson.core:jackson-core:2.20.1=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
+com.fasterxml.jackson.core:jackson-databind:2.20.1=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.20.1=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.20.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.20.1=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
@@ -17,6 +17,8 @@ 
com.fasterxml.woodstox:woodstox-core:7.0.0=jarValidation,runtimeClasspath,runtim
 
com.github.ben-manes.caffeine:caffeine:3.2.2=annotationProcessor,errorprone,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testAnnotationProcessor,testRuntimeClasspath
 
com.github.kevinstern:software-and-algorithms:1.0=annotationProcessor,errorprone,testAnnotationProcessor
 
com.github.luben:zstd-jni:1.5.6-4=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
+com.google.android:annotations:4.1.1.4=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+com.google.api.grpc:proto-google-common-protos:2.61.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
com.google.auto.service:auto-service-annotations:1.0.1=annotationProcessor,errorprone,testAnnotationProcessor
 
com.google.auto.value:auto-value-annotations:1.11.0=annotationProcessor,errorprone,testAnnotationProcessor
 
com.google.auto:auto-common:1.2.2=annotationProcessor,errorprone,testAnnotationProcessor
@@ -32,10 +34,13 @@ 
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava=anno
 
com.google.j2objc:j2objc-annotations:3.1=annotationProcessor,compileClasspath,errorprone,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testAnnotationProcessor,testCompileClasspath,testRuntimeClasspath
 
com.google.protobuf:protobuf-java-util:3.25.8=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
com.google.protobuf:protobuf-java:3.25.8=annotationProcessor,errorprone,jarValidation,runtimeClasspath,runtimeLibs,testAnnotationProcessor,testRuntimeClasspath
-com.helger:profiler:1.1.1=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
com.j256.simplemagic:simplemagic:1.17=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
com.jayway.jsonpath:json-path:2.9.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
com.lmax:disruptor:3.4.4=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
+com.squareup.okhttp3:okhttp-jvm:5.3.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+com.squareup.okhttp3:okhttp:5.3.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+com.squareup.okio:okio-jvm:3.16.2=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+com.squareup.okio:okio:3.16.2=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
com.tdunning:t-digest:3.3=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
com.thoughtworks.paranamer:paranamer:2.8.3=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
com.typesafe.scala-logging:scala-logging_2.13:3.9.5=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
@@ -48,22 +53,34 @@ 
commons-digester:commons-digester:2.1=jarValidation,runtimeClasspath,runtimeLibs
 
commons-io:commons-io:2.20.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
commons-validator:commons-validator:1.7=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.dropwizard.metrics:metrics-annotation:4.2.26=jarValidation,testRuntimeClasspath
-io.dropwizard.metrics:metrics-core:4.2.26=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
-io.dropwizard.metrics:metrics-healthchecks:4.2.26=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
-io.dropwizard.metrics:metrics-jakarta-servlets:4.2.26=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
+io.dropwizard.metrics:metrics-core:4.2.26=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
io.dropwizard.metrics:metrics-jetty12-ee10:4.2.26=jarValidation,testRuntimeClasspath
 io.dropwizard.metrics:metrics-jetty12:4.2.26=jarValidation,testRuntimeClasspath
-io.dropwizard.metrics:metrics-json:4.2.26=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
-io.dropwizard.metrics:metrics-jvm:4.2.26=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
io.github.eisop:dataflow-errorprone:3.41.0-eisop1=annotationProcessor,errorprone,testAnnotationProcessor
 
io.github.java-diff-utils:java-diff-utils:4.12=annotationProcessor,errorprone,testAnnotationProcessor
+io.grpc:grpc-api:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-context:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-core:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-netty:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-protobuf-lite:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-protobuf:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-stub:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.grpc:grpc-util:1.65.1=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.netty:netty-bom:4.2.6.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.netty:netty-buffer:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
io.netty:netty-codec-base:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+io.netty:netty-codec-compression:4.2.6.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.netty:netty-codec-http2:4.2.6.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.netty:netty-codec-http:4.2.6.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.netty:netty-codec-socks:4.2.6.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.netty:netty-common:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+io.netty:netty-handler-proxy:4.2.6.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.netty:netty-handler:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
io.netty:netty-resolver:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
-io.netty:netty-tcnative-boringssl-static:2.0.70.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
-io.netty:netty-tcnative-classes:2.0.70.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+io.netty:netty-tcnative-boringssl-static:2.0.70.Final=compileClasspath,solrPlatformLibs,testCompileClasspath
+io.netty:netty-tcnative-boringssl-static:2.0.73.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.netty:netty-tcnative-classes:2.0.70.Final=compileClasspath,solrPlatformLibs,testCompileClasspath
+io.netty:netty-tcnative-classes:2.0.73.Final=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.netty:netty-transport-classes-epoll:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
io.netty:netty-transport-native-epoll:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
io.netty:netty-transport-native-unix-common:4.2.6.Final=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
@@ -75,13 +92,22 @@ 
io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8:2.22.0-al
 
io.opentelemetry.semconv:opentelemetry-semconv:1.37.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-api-incubator:1.56.0-alpha=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-api:1.56.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+io.opentelemetry:opentelemetry-bom:1.56.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-common:1.56.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-context:1.56.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+io.opentelemetry:opentelemetry-exporter-common:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-exporter-otlp-common:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-exporter-otlp:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-exporter-prometheus:1.56.0-alpha=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
-io.opentelemetry:opentelemetry-sdk-common:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
-io.opentelemetry:opentelemetry-sdk-metrics:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-exporter-sender-okhttp:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-sdk-common:1.56.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
+io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-sdk-logs:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+io.opentelemetry:opentelemetry-sdk-metrics:1.56.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testCompileClasspath,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-sdk-trace:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
io.opentelemetry:opentelemetry-sdk:1.56.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
+io.perfmark:perfmark-api:0.27.0=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
io.prometheus:prometheus-metrics-exposition-formats:1.1.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
io.prometheus:prometheus-metrics-model:1.1.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
io.sgr:s2-geometry-library-java:1.0.0=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
@@ -191,6 +217,8 @@ 
org.glassfish.jersey.inject:jersey-hk2:3.1.11=jarValidation,runtimeClasspath,run
 
org.glassfish.jersey.media:jersey-media-json-jackson:3.1.11=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
 
org.hamcrest:hamcrest:3.0=jarValidation,testCompileClasspath,testRuntimeClasspath
 
org.javassist:javassist:3.30.2-GA=jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testRuntimeClasspath
+org.jetbrains.kotlin:kotlin-stdlib:2.2.21=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
+org.jetbrains:annotations:26.0.2=jarValidation,runtimeClasspath,runtimeLibs,testRuntimeClasspath
 
org.jspecify:jspecify:1.0.0=annotationProcessor,compileClasspath,errorprone,jarValidation,runtimeClasspath,runtimeLibs,solrPlatformLibs,testAnnotationProcessor,testCompileClasspath,testRuntimeClasspath
 org.junit.jupiter:junit-jupiter-api:5.6.2=jarValidation,testRuntimeClasspath
 
org.junit.platform:junit-platform-commons:1.6.2=jarValidation,testRuntimeClasspath
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
index 7e4219f4922..ca3d2a16532 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java
@@ -17,12 +17,8 @@
 package org.apache.solr.crossdc.manager.consumer;
 
 import static org.apache.solr.crossdc.common.KafkaCrossDcConf.PORT;
-import static org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;
 import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.ZK_CONNECT_STRING;
 
-import com.codahale.metrics.SharedMetricRegistries;
-import io.dropwizard.metrics.servlets.MetricsServlet;
-import io.dropwizard.metrics.servlets.ThreadDumpServlet;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
@@ -46,8 +42,6 @@ import org.slf4j.LoggerFactory;
 public class Consumer {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final String METRICS_REGISTRY = "metrics";
-
   private Server server;
   private CrossDcConsumer crossDcConsumer;
 
@@ -84,11 +78,10 @@ public class Consumer {
 
     ConfUtil.verifyProperties(properties);
 
-    String bootstrapServers = (String) 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
-    String topicName = (String) properties.get(TOPIC_NAME);
+    OtelMetrics metrics = new OtelMetrics();
 
     KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
-    crossDcConsumer = getCrossDcConsumer(conf, startLatch);
+    crossDcConsumer = getCrossDcConsumer(conf, metrics, startLatch);
 
     // jetty endpoint for /metrics
     int port = conf.getInt(PORT);
@@ -98,11 +91,12 @@ public class Consumer {
       ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.NO_SESSIONS);
       context.setContextPath("/");
       server.setHandler(context);
+
       context.addServlet(ThreadDumpServlet.class, "/threads/*");
-      context.addServlet(MetricsServlet.class, "/metrics/*");
       context.setAttribute(
-          "com.codahale.metrics.servlets.MetricsServlet.registry",
-          SharedMetricRegistries.getOrCreate(METRICS_REGISTRY));
+          MetricsServlet.SOLR_METRICS_MANAGER_ATTRIBUTE, 
metrics.getMetricManager());
+      context.addServlet(MetricsServlet.class, "/metrics/*");
+
       for (ServletMapping mapping : 
context.getServletHandler().getServletMappings()) {
         if (log.isInfoEnabled()) {
           log.info(" - {}", mapping.getPathSpecs()[0]);
@@ -153,8 +147,9 @@ public class Consumer {
     }
   }
 
-  protected CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, 
CountDownLatch startLatch) {
-    return new KafkaCrossDcConsumer(conf, startLatch);
+  protected CrossDcConsumer getCrossDcConsumer(
+      KafkaCrossDcConf conf, ConsumerMetrics metrics, CountDownLatch 
startLatch) {
+    return new KafkaCrossDcConsumer(conf, metrics, startLatch);
   }
 
   public static void main(String[] args) {
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ConsumerMetrics.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ConsumerMetrics.java
new file mode 100644
index 00000000000..92d96fae866
--- /dev/null
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ConsumerMetrics.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+
+/**
+ * Interface for tracking and recording metrics related to the processing of 
messages and requests
+ * in the {@link Consumer}. Provides methods to increment counters, record 
timing metrics, and
+ * capture other performance-related data points.
+ */
+public interface ConsumerMetrics {
+
+  /** No-op implementation of {@link ConsumerMetrics}. */
+  ConsumerMetrics NOOP =
+      new ConsumerMetrics() {
+        @Override
+        public void incrementCollapsedCounter() {}
+
+        @Override
+        public void incrementInputMsgCounter(long delta) {}
+
+        @Override
+        public void incrementInputReqCounter(String type, String subType, int 
delta) {}
+
+        @Override
+        public void incrementOutputCounter(String type, String result, int 
delta) {}
+
+        @Override
+        public void recordOutputBatchSize(
+            MirroredSolrRequest.Type type, SolrRequest<?> solrRequest) {}
+
+        @Override
+        public void recordOutputBackoffTime(MirroredSolrRequest.Type type, 
long backoffTimeMs) {}
+
+        @Override
+        public void recordOutputFirstAttemptTime(
+            MirroredSolrRequest.Type type, long firstAttemptTimeMs) {}
+
+        @Override
+        public ConsumerTimer startOutputTimeTimer(String requestType) {
+          return () -> 0;
+        }
+      };
+
+  /**
+   * Represents a timer interface used for measuring and observing the 
duration of tasks. Start
+   * measuring elapsed time when created.
+   */
+  interface ConsumerTimer {
+    /** Return the elapsed time in milliseconds. */
+    double observeDuration();
+
+    /** Close this timer. */
+    default void close() {
+      observeDuration();
+    }
+  }
+
+  /** Increments the counter for input messages. */
+  default void incrementInputMsgCounter() {
+    incrementInputMsgCounter(1L);
+  }
+
+  /**
+   * Increments the counter for input messages.
+   *
+   * @param delta increase the counter by this value
+   */
+  void incrementInputMsgCounter(long delta);
+
+  /** Increments the counter for collapsed "add" requests. */
+  void incrementCollapsedCounter();
+
+  /**
+   * Increments the counter for input requests by type and subtype.
+   *
+   * @param type request type, one of {@link
+   *     org.apache.solr.crossdc.common.MirroredSolrRequest.Type} values.
+   * @param subType additional subtype: add, delete_by_id, delete_by_query, or 
action for other
+   *     request types.
+   */
+  default void incrementInputReqCounter(String type, String subType) {
+    incrementInputReqCounter(type, subType, 1);
+  }
+
+  /**
+   * Increments the counter for input requests by type, subtype, and delta.
+   *
+   * @param type request type, one of {@link
+   *     org.apache.solr.crossdc.common.MirroredSolrRequest.Type} values.
+   * @param subType additional subtype: add, delete_by_id, delete_by_query, or 
action for other
+   *     request types.
+   * @param delta increase the counter by this value
+   */
+  void incrementInputReqCounter(String type, String subType, int delta);
+
+  /**
+   * Increments the counter for output requests by type and result.
+   *
+   * @param type the type of the request
+   * @param result the result of the request, such as success or failure
+   */
+  default void incrementOutputCounter(String type, String result) {
+    incrementOutputCounter(type, result, 1);
+  }
+
+  /**
+   * Increments the counter for output requests by type, result, and delta.
+   *
+   * @param type the type of the request
+   * @param result the result of the request, such as success or failure
+   * @param delta the value by which the counter should be increased
+   */
+  void incrementOutputCounter(String type, String result, int delta);
+
+  /**
+   * Records the batch size of the output request. Batch size is defined as 
the number of operations
+   * in an output {@link SolrRequest} (which may be different than the input 
size due to
+   * collapsing).
+   *
+   * @param type the type of the request, corresponding to one of the {@link
+   *     MirroredSolrRequest#getType()} values
+   * @param solrRequest SolrRequest object for which the batch size is being 
recorded
+   */
+  void recordOutputBatchSize(MirroredSolrRequest.Type type, SolrRequest<?> 
solrRequest);
+
+  /**
+   * Records the backoff time for output requests. Backoff time represents the 
delay before the next
+   * retry for the specified request type.
+   *
+   * @param type the type of the request, corresponding to one of the {@link
+   *     MirroredSolrRequest#getType()} values.
+   * @param backoffTimeMs the backoff time in milliseconds.
+   */
+  void recordOutputBackoffTime(MirroredSolrRequest.Type type, long 
backoffTimeMs);
+
+  /**
+   * Records the latency between the time when the message was sent at source 
and the time of the
+   * first attempt at processing.
+   *
+   * @param type the type of the request, corresponding to one of the {@link
+   *     MirroredSolrRequest#getType()} values
+   * @param firstAttemptTimeMs the latency of the first attempt in 
milliseconds.
+   */
+  void recordOutputFirstAttemptTime(MirroredSolrRequest.Type type, long 
firstAttemptTimeMs);
+
+  /**
+   * Starts a timer to measure the duration of an output request processing by 
the given request
+   * type.
+   *
+   * @param requestType the type of the request for which the timer is started
+   * @return a {@link ConsumerTimer} that allows to measure the elapsed time
+   */
+  ConsumerTimer startOutputTimeTimer(String requestType);
+}
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index dd7aabef25a..19a8a4d115c 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -16,8 +16,6 @@
  */
 package org.apache.solr.crossdc.manager.consumer;
 
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SharedMetricRegistries;
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.Arrays;
@@ -73,9 +71,6 @@ import org.slf4j.LoggerFactory;
 public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final MetricRegistry metrics =
-      SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
-
   private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
   private final CountDownLatch startLatch;
   KafkaMirroringSink kafkaMirroringSink;
@@ -86,6 +81,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   private final CrossDcConf.CollapseUpdates collapseUpdates;
   private final int maxCollapseRecords;
   private final SolrMessageProcessor messageProcessor;
+  protected final ConsumerMetrics metrics;
 
   protected SolrClientSupplier solrClientSupplier;
 
@@ -163,8 +159,9 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
    * @param conf The Kafka consumer configuration
    * @param startLatch To inform the caller when the Consumer has started
    */
-  public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch 
startLatch) {
-
+  public KafkaCrossDcConsumer(
+      KafkaCrossDcConf conf, ConsumerMetrics metrics, CountDownLatch 
startLatch) {
+    this.metrics = metrics;
     this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
     this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
     this.collapseUpdates =
@@ -239,7 +236,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   }
 
   protected SolrMessageProcessor createSolrMessageProcessor() {
-    return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L);
+    return new SolrMessageProcessor(metrics, solrClientSupplier, 
resubmitRequest -> 0L);
   }
 
   public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(Properties properties) {
@@ -341,11 +338,17 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
                   requestRecord.value());
             }
 
+            metrics.incrementInputMsgCounter();
             lastRecord = requestRecord;
             MirroredSolrRequest<?> req = requestRecord.value();
             SolrRequest<?> solrReq = req.getSolrRequest();
             MirroredSolrRequest.Type type = req.getType();
-            metrics.counter(MetricRegistry.name(type.name(), "input")).inc();
+
+            if (type != MirroredSolrRequest.Type.UPDATE) {
+              String action = solrReq.getParams().get("action", "unknown");
+              metrics.incrementInputReqCounter(type.name(), action);
+            }
+
             ModifiableSolrParams params = new 
ModifiableSolrParams(solrReq.getParams());
             if (log.isTraceEnabled()) {
               log.trace("-- picked type={}, params={}", req.getType(), params);
@@ -398,7 +401,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
                 if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && 
hasDeletes) {
                   throw new RuntimeException("Can't collapse requests with 
deletions.");
                 }
-                metrics.counter(MetricRegistry.name(type.name(), 
"collapsed")).inc();
+                metrics.incrementCollapsedCounter();
                 currentCollapsed++;
               }
               UpdateRequest update = (UpdateRequest) solrReq;
@@ -408,19 +411,20 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
               List<SolrInputDocument> docs = update.getDocuments();
               if (docs != null) {
                 updateReqBatch.add(docs);
-                metrics.counter(MetricRegistry.name(type.name(), 
"add")).inc(docs.size());
+                metrics.incrementInputReqCounter(type.name(), "add", 
docs.size());
               }
               List<String> deletes = update.getDeleteById();
               if (deletes != null) {
                 updateReqBatch.deleteById(deletes);
-                metrics.counter(MetricRegistry.name(type.name(), 
"dbi")).inc(deletes.size());
+                metrics.incrementInputReqCounter(type.name(), "delete_by_id", 
deletes.size());
               }
               List<String> deleteByQuery = update.getDeleteQuery();
               if (deleteByQuery != null) {
                 for (String delByQuery : deleteByQuery) {
                   updateReqBatch.deleteByQuery(delByQuery);
                 }
-                metrics.counter(MetricRegistry.name(type.name(), 
"dbq")).inc(deleteByQuery.size());
+                metrics.incrementInputReqCounter(
+                    type.name(), "delete_by_query", deleteByQuery.size());
               }
             } else {
               // non-update requests should be sent immediately
@@ -505,6 +509,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
                 final IQueueHandler.Result<MirroredSolrRequest<?>> result =
                     messageProcessor.handleItem(mirroredSolrRequest);
 
+                metrics.recordOutputBatchSize(type, solrReqBatch);
                 processResult(type, result);
               } catch (MirroringException e) {
                 // We don't really know what to do here
@@ -534,10 +539,10 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
                 "Sending message to dead letter queue because of max attempts 
limit with current value = {}",
                 attempt);
             kafkaMirroringSink.submitToDlq(item);
-            metrics.counter(MetricRegistry.name(type.name(), 
"failed-dlq")).inc();
+            metrics.incrementOutputCounter(type.name(), "failed_dlq");
           } else {
             kafkaMirroringSink.submit(item);
-            metrics.counter(MetricRegistry.name(type.name(), 
"failed-resubmit")).inc();
+            metrics.incrementOutputCounter(type.name(), "failed_resubmit");
           }
         } catch (Exception e) {
           log.error(
@@ -552,18 +557,18 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         if (log.isTraceEnabled()) {
           log.trace("result=handled");
         }
-        metrics.counter(MetricRegistry.name(type.name(), "handled")).inc();
+        metrics.incrementOutputCounter(type.name(), "handled");
         break;
       case NOT_HANDLED_SHUTDOWN:
         if (log.isTraceEnabled()) {
-          log.trace("result=nothandled_shutdown");
+          log.trace("result=unhandled_shutdown");
         }
-        metrics.counter(MetricRegistry.name(type.name(), 
"nothandled_shutdown")).inc();
+        metrics.incrementOutputCounter(type.name(), "unhandled_shutdown");
         break;
       case FAILED_RETRY:
         log.error(
             "Unexpected response while processing request. We never expect 
{}.", result.status());
-        metrics.counter(MetricRegistry.name(type.name(), 
"failed-retry")).inc();
+        metrics.incrementOutputCounter(type.name(), "failed_retry");
         break;
       case FAILED_NO_RETRY:
         if (log.isDebugEnabled()) {
@@ -574,7 +579,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         } catch (Exception e) {
           log.error("Failed to send message to dead-letter queue, msg={}", 
item, e);
         }
-        metrics.counter(MetricRegistry.name(type.name(), 
"failed-no-retry")).inc();
+        metrics.incrementOutputCounter(type.name(), "failed_no_retry");
         break;
       default:
         if (log.isTraceEnabled()) {
@@ -605,7 +610,9 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     } catch (Exception e) {
       log.warn("Exception closing Solr client on shutdown", e);
     } finally {
-      Util.logMetrics(metrics);
+      if (metrics instanceof OtelMetrics) {
+        Util.logMetrics(((OtelMetrics) metrics).getMetricManager());
+      }
     }
   }
 }
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/MetricsServlet.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/MetricsServlet.java
new file mode 100644
index 00000000000..9cb1ecae482
--- /dev/null
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/MetricsServlet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import org.apache.solr.handler.admin.MetricsHandler;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.PrometheusResponseWriter;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.servlet.ServletUtils;
+import org.apache.solr.servlet.SolrRequestParsers;
+
+/**
+ * Helper servlet that exports collected metrics in Prometheus format using 
{@link MetricsHandler}.
+ */
+public class MetricsServlet extends HttpServlet {
+  private static final long serialVersionUID = -2881083456665410780L;
+
+  public static final String SOLR_METRICS_MANAGER_ATTRIBUTE =
+      MetricsServlet.class.getName() + ".solrMetricsManager";
+
+  private SolrMetricManager metricManager;
+  private MetricsHandler metricsHandler;
+  private static final PrometheusResponseWriter writer = new 
PrometheusResponseWriter();
+
+  @Override
+  public void init() throws ServletException {
+    metricManager =
+        (SolrMetricManager) 
getServletContext().getAttribute(SOLR_METRICS_MANAGER_ATTRIBUTE);
+    metricsHandler = new MetricsHandler(metricManager);
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    try {
+      final SolrQueryResponse solrQueryResponse = new SolrQueryResponse();
+      final String path = ServletUtils.getPathAfterContext(req);
+      SolrQueryRequest solrQueryRequest = 
SolrRequestParsers.DEFAULT.parse(null, path, req);
+      metricsHandler.handleRequestBody(solrQueryRequest, solrQueryResponse);
+      resp.setStatus(HttpServletResponse.SC_OK);
+      final String contentType = writer.getContentType(solrQueryRequest, 
solrQueryResponse);
+      resp.setContentType(contentType);
+      resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
+      writer.write(resp.getOutputStream(), solrQueryRequest, 
solrQueryResponse, contentType);
+    } catch (Exception e) {
+      throw new ServletException(e);
+    }
+  }
+}
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java
new file mode 100644
index 00000000000..1c0fcd5bc77
--- /dev/null
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.metrics.SolrMetricsContext;
+import org.apache.solr.metrics.otel.OtelUnit;
+import org.apache.solr.metrics.otel.instruments.AttributedLongTimer;
+import org.apache.solr.opentelemetry.OtlpExporterFactory;
+
+public class OtelMetrics implements ConsumerMetrics {
+
+  public static final String REGISTRY = "crossdc.consumer.registry";
+  public static final String NAME_PREFIX = "crossdc_consumer_";
+  public static final String ATTR_TYPE = "type";
+  public static final String ATTR_SUBTYPE = "subtype";
+  public static final String ATTR_RESULT = "result";
+
+  protected final Map<String, Attributes> attributesCache = new 
ConcurrentHashMap<>();
+
+  protected SolrMetricManager metricManager;
+
+  protected LongCounter inputMsg;
+  protected LongCounter inputReq;
+  protected LongCounter collapsed;
+  protected LongCounter output;
+  protected LongHistogram outputBatchSizeHistogram;
+  protected LongHistogram outputTimeHistogram;
+  protected LongHistogram outputBackoffHistogram;
+  protected LongHistogram outputFirstAttemptHistogram;
+
+  public OtelMetrics() {
+    register(REGISTRY);
+  }
+
+  protected void register(String scope) {
+    this.metricManager = new SolrMetricManager(new 
OtlpExporterFactory().getExporter());
+    SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, 
scope);
+
+    inputMsg =
+        metricsContext.longCounter(
+            NAME_PREFIX + "input_msg_total", "Total number of input Kafka 
messages");
+
+    inputReq =
+        metricsContext.longCounter(
+            NAME_PREFIX + "input_req_total", "Total number of input Solr 
requests");
+
+    collapsed =
+        metricsContext.longCounter(
+            NAME_PREFIX + "collapsed_total", "Total number of collapsed update 
requests");
+
+    output =
+        metricsContext.longCounter(NAME_PREFIX + "output_total", "Total number 
of output requests");
+
+    outputBatchSizeHistogram =
+        metricsContext.longHistogram(
+            NAME_PREFIX + "output_batch_size", "Histogram of output batch 
sizes");
+
+    outputBackoffHistogram =
+        metricsContext.longHistogram(
+            NAME_PREFIX + "output_backoff_time", "Histogram of output backoff 
sleep times");
+
+    outputTimeHistogram =
+        metricsContext.longHistogram(
+            NAME_PREFIX + "output_time",
+            "Histogram of output request times",
+            OtelUnit.MILLISECONDS);
+
+    outputFirstAttemptHistogram =
+        metricsContext.longHistogram(
+            NAME_PREFIX + "output_first_attempt_time",
+            "Histogram of first attempt request times",
+            OtelUnit.MILLISECONDS);
+  }
+
+  protected static final String KEY_SEPARATOR = "#";
+
+  protected Attributes attr(String key1, String value1) {
+    String key = key1 + KEY_SEPARATOR + value1;
+    return attributesCache.computeIfAbsent(
+        key, k -> Attributes.builder().put(key1, value1).build());
+  }
+
+  protected Attributes attr(String key1, String value1, String key2, String 
value2) {
+    String key = key1 + KEY_SEPARATOR + value1 + KEY_SEPARATOR + key2 + 
KEY_SEPARATOR + value2;
+    return attributesCache.computeIfAbsent(
+        key, k -> Attributes.builder().put(key1, value1).put(key2, 
value2).build());
+  }
+
+  public SolrMetricManager getMetricManager() {
+    return metricManager;
+  }
+
+  @Override
+  public void incrementCollapsedCounter() {
+    collapsed.add(1L);
+  }
+
+  @Override
+  public void incrementInputMsgCounter(long delta) {
+    inputMsg.add(delta);
+  }
+
+  @Override
+  public void incrementInputReqCounter(String type, String subType, int delta) 
{
+    inputReq.add(delta, attr(ATTR_TYPE, type, ATTR_SUBTYPE, subType));
+  }
+
+  @Override
+  public void incrementOutputCounter(String type, String result, int delta) {
+    output.add(delta, attr(ATTR_TYPE, type, ATTR_RESULT, result));
+  }
+
+  @Override
+  public void recordOutputBatchSize(MirroredSolrRequest.Type type, 
SolrRequest<?> solrRequest) {
+    if (type != MirroredSolrRequest.Type.UPDATE) {
+      outputBatchSizeHistogram.record(
+          1,
+          attr(
+              ATTR_TYPE,
+              type.name(),
+              ATTR_SUBTYPE,
+              solrRequest.getParams().get("action", "unknown")));
+      return;
+    }
+    UpdateRequest req = (UpdateRequest) solrRequest;
+    int addCount = req.getDocuments() == null ? 0 : req.getDocuments().size();
+    int dbiCount = req.getDeleteById() == null ? 0 : 
req.getDeleteById().size();
+    int dbqCount = req.getDeleteQuery() == null ? 0 : 
req.getDeleteQuery().size();
+    if (addCount > 0) {
+      outputBatchSizeHistogram.record(addCount, attr(ATTR_TYPE, type.name(), 
ATTR_SUBTYPE, "add"));
+    }
+    if (dbiCount > 0) {
+      outputBatchSizeHistogram.record(
+          dbiCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, 
"delete_by_id"));
+    }
+    if (dbqCount > 0) {
+      outputBatchSizeHistogram.record(
+          dbqCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, 
"delete_by_query"));
+    }
+  }
+
+  @Override
+  public void recordOutputBackoffTime(MirroredSolrRequest.Type type, long 
backoffTimeMs) {
+    outputBackoffHistogram.record(backoffTimeMs, attr(ATTR_TYPE, type.name()));
+  }
+
+  @Override
+  public void recordOutputFirstAttemptTime(MirroredSolrRequest.Type type, long 
firstAttemptTimeNs) {
+    outputFirstAttemptHistogram.record(firstAttemptTimeNs, attr(ATTR_TYPE, 
type.name()));
+  }
+
+  @Override
+  public ConsumerTimer startOutputTimeTimer(final String requestType) {
+    final AttributedLongTimer timer =
+        new AttributedLongTimer(outputTimeHistogram, attr(ATTR_TYPE, 
requestType));
+    final AttributedLongTimer.MetricTimer metricTimer = timer.start();
+    return () -> metricTimer.stop();
+  }
+}
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ThreadDump.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ThreadDump.java
new file mode 100644
index 00000000000..2f0fb116bf9
--- /dev/null
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ThreadDump.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Locale;
+
+/**
+ * A convenience class for getting a thread dump.
+ *
+ * <p>Copy of the code in <code>
+ * 
https://github.com/dropwizard/metrics/blob/release/5.0.x/metrics-jvm/src/main/java/io/dropwizard/metrics5/jvm/ThreadDump.java
+ * </code>
+ */
+public class ThreadDump {
+  private final ThreadMXBean threadMXBean;
+
+  public ThreadDump(ThreadMXBean threadMXBean) {
+    this.threadMXBean = threadMXBean;
+  }
+
+  /**
+   * Dumps all of the threads' current information, including synchronization, 
to an output stream.
+   *
+   * @param out an output stream
+   */
+  public void dump(OutputStream out) {
+    dump(true, true, out);
+  }
+
+  /**
+   * Dumps all of the threads' current information, optionally including 
synchronization, to an
+   * output stream.
+   *
+   * <p>Having control over including synchronization info allows using this 
method (and its
+   * wrappers, i.e. ThreadDumpServlet) in environments where getting object 
monitor and/or ownable
+   * synchronizer usage is not supported. It can also speed things up.
+   *
+   * <p>See {@link ThreadMXBean#dumpAllThreads(boolean, boolean)}
+   *
+   * @param lockedMonitors dump all locked monitors if true
+   * @param lockedSynchronizers dump all locked ownable synchronizers if true
+   * @param out an output stream
+   */
+  public void dump(boolean lockedMonitors, boolean lockedSynchronizers, 
OutputStream out) {
+    final ThreadInfo[] threads =
+        this.threadMXBean.dumpAllThreads(lockedMonitors, lockedSynchronizers);
+    final PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, 
UTF_8));
+
+    for (int ti = threads.length - 1; ti >= 0; ti--) {
+      final ThreadInfo t = threads[ti];
+      writer.printf(
+          Locale.ROOT,
+          "\"%s\" id=%d state=%s",
+          t.getThreadName(),
+          t.getThreadId(),
+          t.getThreadState());
+      final LockInfo lock = t.getLockInfo();
+      if (lock != null && t.getThreadState() != Thread.State.BLOCKED) {
+        writer.printf(
+            Locale.ROOT,
+            "%n    - waiting on <0x%08x> (a %s)",
+            lock.getIdentityHashCode(),
+            lock.getClassName());
+        writer.printf(
+            Locale.ROOT,
+            "%n    - locked <0x%08x> (a %s)",
+            lock.getIdentityHashCode(),
+            lock.getClassName());
+      } else if (lock != null && t.getThreadState() == Thread.State.BLOCKED) {
+        writer.printf(
+            Locale.ROOT,
+            "%n    - waiting to lock <0x%08x> (a %s)",
+            lock.getIdentityHashCode(),
+            lock.getClassName());
+      }
+
+      if (t.isSuspended()) {
+        writer.print(" (suspended)");
+      }
+
+      if (t.isInNative()) {
+        writer.print(" (running in native)");
+      }
+
+      writer.println();
+      if (t.getLockOwnerName() != null) {
+        writer.printf(
+            Locale.ROOT, "     owned by %s id=%d%n", t.getLockOwnerName(), 
t.getLockOwnerId());
+      }
+
+      final StackTraceElement[] elements = t.getStackTrace();
+      final MonitorInfo[] monitors = t.getLockedMonitors();
+
+      for (int i = 0; i < elements.length; i++) {
+        final StackTraceElement element = elements[i];
+        writer.printf(Locale.ROOT, "    at %s%n", element);
+        for (int j = 1; j < monitors.length; j++) {
+          final MonitorInfo monitor = monitors[j];
+          if (monitor.getLockedStackDepth() == i) {
+            writer.printf(Locale.ROOT, "      - locked %s%n", monitor);
+          }
+        }
+      }
+      writer.println();
+
+      final LockInfo[] locks = t.getLockedSynchronizers();
+      if (locks.length > 0) {
+        writer.printf(Locale.ROOT, "    Locked synchronizers: count = %d%n", 
locks.length);
+        for (LockInfo l : locks) {
+          writer.printf(Locale.ROOT, "      - %s%n", l);
+        }
+        writer.println();
+      }
+    }
+
+    writer.println();
+    writer.flush();
+  }
+}
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ThreadDumpServlet.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ThreadDumpServlet.java
new file mode 100644
index 00000000000..cea2bb33988
--- /dev/null
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/ThreadDumpServlet.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.manager.consumer;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.charset.StandardCharsets;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.servlet.SolrRequestParsers;
+
+/**
+ * An HTTP servlets which outputs a {@code text/plain} dump of all threads in 
the VM. Only responds
+ * to {@code GET} requests.
+ *
+ * <p>Copy of the code from <code>
+ * 
https://github.com/dropwizard/metrics/blob/release/5.0.x/metrics-jakarta-servlets/src/main/java/io/dropwizard/metrics5/servlets/ThreadDumpServlet.java
+ * </code>
+ */
+public class ThreadDumpServlet extends HttpServlet {
+
+  private static final long serialVersionUID = -2690343532336103046L;
+  private static final String CONTENT_TYPE = "text/plain";
+
+  private transient ThreadDump threadDump;
+
+  @Override
+  public void init() throws ServletException {
+    try {
+      // Some PaaS like Google App Engine blacklist java.lang.managament
+      this.threadDump = new ThreadDump(ManagementFactory.getThreadMXBean());
+    } catch (NoClassDefFoundError ncdfe) {
+      this.threadDump = null; // we won't be able to provide thread dump
+    }
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    final SolrParams queryParams = 
SolrRequestParsers.parseQueryString(req.getQueryString());
+    final boolean includeMonitors = queryParams.getBool("monitors", true);
+    final boolean includeSynchronizers = queryParams.getBool("synchronizers", 
true);
+
+    resp.setStatus(HttpServletResponse.SC_OK);
+    resp.setContentType(CONTENT_TYPE);
+    resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
+    if (threadDump == null) {
+      resp.getOutputStream()
+          .write(
+              "Sorry your runtime environment does not allow to dump 
threads.\r\n"
+                  .getBytes(StandardCharsets.UTF_8));
+      return;
+    }
+    try (OutputStream output = resp.getOutputStream()) {
+      threadDump.dump(includeMonitors, includeSynchronizers, output);
+    }
+  }
+}
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Util.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Util.java
index 4c964c1a739..f0d4dcca4da 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Util.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Util.java
@@ -16,14 +16,9 @@
  */
 package org.apache.solr.crossdc.manager.consumer;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
+import java.io.ByteArrayOutputStream;
 import java.lang.invoke.MethodHandles;
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
 import java.util.Properties;
 import java.util.Set;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -34,41 +29,33 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.handler.admin.MetricsHandler;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.response.PrometheusResponseWriter;
+import org.apache.solr.response.SolrQueryResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Util {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  @SuppressWarnings("rawtypes")
-  public static void logMetrics(MetricRegistry metricRegistry) {
-    log.info("Metrics Registry:");
-    for (Map.Entry<String, Gauge> entry : 
metricRegistry.getGauges().entrySet()) {
-      if (log.isInfoEnabled()) {
-        log.info("Gauge {}: {}", entry.getKey(), entry.getValue().getValue());
-      }
-    }
-    for (Map.Entry<String, Counter> entry : 
metricRegistry.getCounters().entrySet()) {
-      if (log.isInfoEnabled()) {
-        log.info("Counter {}: {}", entry.getKey(), 
entry.getValue().getCount());
-      }
-    }
-    for (Map.Entry<String, Histogram> entry : 
metricRegistry.getHistograms().entrySet()) {
-      if (log.isInfoEnabled()) {
-        log.info("Histogram {}: {}", entry.getKey(), 
entry.getValue().getSnapshot().toString());
-      }
-    }
-    for (Map.Entry<String, Meter> entry : 
metricRegistry.getMeters().entrySet()) {
-      if (log.isInfoEnabled()) {
-        log.info("Meter {}: {}", entry.getKey(), entry.getValue().getCount());
-      }
-    }
-    for (Map.Entry<String, Timer> entry : 
metricRegistry.getTimers().entrySet()) {
-      if (log.isInfoEnabled()) {
-        log.info("Timer {}: {}", entry.getKey(), 
entry.getValue().getSnapshot().toString());
-      }
+  public static void logMetrics(SolrMetricManager metricManager) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    new MetricsHandler(metricManager)
+        .handleRequest(SolrParams.of(), (key, value) -> rsp.add(key, value));
+    String output;
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      new PrometheusResponseWriter()
+          .write(baos, null, rsp, 
PrometheusResponseWriter.CONTENT_TYPE_PROMETHEUS);
+      output = baos.toString(StandardCharsets.UTF_8);
+    } catch (Exception e) {
+      log.error("Error while writing final metrics", e);
+      output = rsp.toString();
     }
+    log.info("#### Consumer Metrics: ####\n{}", output);
   }
 
   public static void printKafkaInfo(String host, String groupId) {
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
index 29f83052fe1..de3484ca45a 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
@@ -16,9 +16,6 @@
  */
 package org.apache.solr.crossdc.manager.messageprocessor;
 
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SharedMetricRegistries;
-import com.codahale.metrics.Timer;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
@@ -36,12 +33,13 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.crossdc.common.CrossDcConstants;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.manager.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -59,16 +57,17 @@ public class SolrMessageProcessor extends MessageProcessor
     implements IQueueHandler<MirroredSolrRequest<?>> {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final MetricRegistry metrics =
-      SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
-
+  final ConsumerMetrics metrics;
   final Supplier<CloudSolrClient> clientSupplier;
 
   private static final String VERSION_FIELD = "_version_";
 
   public SolrMessageProcessor(
-      Supplier<CloudSolrClient> clientSupplier, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
+      ConsumerMetrics metrics,
+      Supplier<CloudSolrClient> clientSupplier,
+      ResubmitBackoffPolicy resubmitBackoffPolicy) {
     super(resubmitBackoffPolicy);
+    this.metrics = metrics;
     this.clientSupplier = clientSupplier;
   }
 
@@ -147,7 +146,7 @@ public class SolrMessageProcessor extends MessageProcessor
       sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
     }
     log.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
-    metrics.meter(MetricRegistry.name(request.getType().name(), 
"backoff")).mark(sleepTimeMs);
+    metrics.recordOutputBackoffTime(request.getType(), sleepTimeMs);
     uncheckedSleep(sleepTimeMs);
   }
 
@@ -211,18 +210,21 @@ public class SolrMessageProcessor extends MessageProcessor
               "Skipping update request to nonexistent / not updatable 
collection {}",
               request.getCollection());
         }
-        metrics.counter(MetricRegistry.name(type.name(), 
"invalid-collection")).inc();
+        metrics.incrementOutputCounter(type.name(), 
"failed_collection_not_found");
         return new Result<>(ResultStatus.FAILED_NO_RETRY, mirroredSolrRequest);
       }
     }
 
     Result<MirroredSolrRequest<?>> result;
     SolrResponseBase response;
-    Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), 
"outputTime")).time();
+    ConsumerMetrics.ConsumerTimer timer = 
metrics.startOutputTimeTimer(type.name());
     try {
       response = (SolrResponseBase) request.process(clientSupplier.get());
     } finally {
-      ctx.stop();
+      // unit tests might not care
+      if (timer != null) {
+        timer.close();
+      }
     }
 
     int status = response.getStatus();
@@ -232,7 +234,7 @@ public class SolrMessageProcessor extends MessageProcessor
     }
 
     if (status != 0) {
-      metrics.counter(MetricRegistry.name(type.name(), "outputErrors")).inc();
+      metrics.incrementOutputCounter(type.name(), "solr_error");
       throw new SolrException(SolrException.ErrorCode.getErrorCode(status), 
"response=" + response);
     }
 
@@ -325,11 +327,11 @@ public class SolrMessageProcessor extends MessageProcessor
     // submitting on the primary side until the request is eligible to be 
consumed on the buddy side
     // (or vice versa).
     if (mirroredSolrRequest.getAttempt() == 1) {
-      final long latency = System.nanoTime() - 
mirroredSolrRequest.getSubmitTimeNanos();
-      log.debug("First attempt latency = {} ns", latency);
-      metrics
-          .timer(MetricRegistry.name(mirroredSolrRequest.getType().name(), 
"outputLatency"))
-          .update(latency, TimeUnit.NANOSECONDS);
+      final long latencyMs =
+          TimeUnit.NANOSECONDS.toMillis(
+              TimeSource.CURRENT_TIME.getTimeNs() - 
mirroredSolrRequest.getSubmitTimeNanos());
+      log.debug("First attempt latency = {} ms", latencyMs);
+      metrics.recordOutputFirstAttemptTime(mirroredSolrRequest.getType(), 
latencyMs);
     }
   }
 
@@ -398,7 +400,7 @@ public class SolrMessageProcessor extends MessageProcessor
     if (result.status().equals(ResultStatus.FAILED_RESUBMIT)) {
       final long backoffMs = 
getResubmitBackoffPolicy().getBackoffTimeMs(result.getItem());
       if (backoffMs > 0L) {
-        metrics.meter(MetricRegistry.name(type.name(), 
"backoff")).mark(backoffMs);
+        metrics.recordOutputBackoffTime(type, backoffMs);
         try {
           Thread.sleep(backoffMs);
         } catch (final InterruptedException ex) {
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
index 421f7b1c0c1..1898def635c 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
@@ -42,6 +42,7 @@ import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.manager.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
 import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
 import org.apache.solr.crossdc.manager.messageprocessor.SolrMessageProcessor;
 import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
@@ -160,11 +161,12 @@ public class DeleteByQueryToIdTest extends 
SolrCloudTestCase {
         new Consumer() {
           @Override
           protected CrossDcConsumer getCrossDcConsumer(
-              KafkaCrossDcConf conf, CountDownLatch startLatch) {
-            return new KafkaCrossDcConsumer(conf, startLatch) {
+              KafkaCrossDcConf conf, ConsumerMetrics metrics, CountDownLatch 
startLatch) {
+            return new KafkaCrossDcConsumer(conf, metrics, startLatch) {
               @Override
               protected SolrMessageProcessor createSolrMessageProcessor() {
-                return new SolrMessageProcessor(solrClientSupplier, 
resubmitRequest -> 0L) {
+                return new SolrMessageProcessor(
+                    metrics, solrClientSupplier, resubmitRequest -> 0L) {
                   @Override
                   public Result<MirroredSolrRequest<?>> handleItem(
                       MirroredSolrRequest<?> mirroredSolrRequest) {
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
index 06f5add80c0..d6bf7fdcc60 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.crossdc.manager;
 
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.manager.consumer.OtelMetrics;
 import org.apache.solr.crossdc.manager.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -51,7 +53,7 @@ public class SimpleSolrIntegrationTest extends 
SolrCloudTestCase {
 
     CloudSolrClient cloudClient1 = cluster1.getSolrClient();
 
-    processor = new SolrMessageProcessor(() -> cloudClient1, null);
+    processor = new SolrMessageProcessor(mock(OtelMetrics.class), () -> 
cloudClient1, null);
 
     CollectionAdminRequest.Create create =
         CollectionAdminRequest.createCollection(COLLECTION, 1, 1);
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 3770884b372..7d7941dc442 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -18,13 +18,14 @@ package org.apache.solr.crossdc.manager;
 
 import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
 import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
-import static org.apache.solr.crossdc.common.KafkaCrossDcConf.PORT;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -32,6 +33,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -40,15 +42,20 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.tests.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
 import org.apache.solr.client.solrj.request.SolrQuery;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.InputStreamResponseParser;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
@@ -101,7 +108,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     uceh = Thread.getDefaultUncaughtExceptionHandler();
     Thread.setDefaultUncaughtExceptionHandler(
         (t, e) -> log.error("Uncaught exception in thread {}", t, e));
-    System.setProperty(PORT, "-1");
+    System.setProperty("otel.metrics.exporter", "prometheus");
     consumer = new Consumer();
     Properties config = new Properties();
 
@@ -185,7 +192,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
 
     client.commit(COLLECTION);
 
-    System.out.println("Sent producer record");
+    log.info("Sent producer record");
 
     assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
   }
@@ -332,6 +339,39 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 5000);
   }
 
+  @Test
+  @SuppressWarnings({"unchecked"})
+  public void testMetrics() throws Exception {
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(new Date().getTime()));
+    doc.addField("text", "some test");
+
+    client.add(COLLECTION, doc);
+
+    client.commit(COLLECTION);
+
+    log.info("Sent producer record");
+
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+
+    String baseUrl = "http://localhost:"; + KafkaCrossDcConf.DEFAULT_PORT;
+    HttpJettySolrClient httpJettySolrClient =
+        new HttpJettySolrClient.Builder(baseUrl).useHttp1_1(true).build();
+    try {
+      GenericSolrRequest req = new GenericSolrRequest(SolrRequest.METHOD.GET, 
"/metrics");
+      req.setResponseParser(new InputStreamResponseParser(null));
+      NamedList<Object> rsp = httpJettySolrClient.request(req);
+      String content =
+          IOUtils.toString(
+              (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), 
StandardCharsets.UTF_8);
+      assertTrue(content, content.contains("crossdc_consumer_output_total"));
+    } finally {
+      httpJettySolrClient.close();
+      client.close();
+    }
+  }
+
   private void assertCluster2EventuallyHasDocs(String collection, String 
query, int expectedNumDocs)
       throws Exception {
     assertClusterEventuallyHasDocs(
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
index 9f8e904a72b..89f85f36db2 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
@@ -111,7 +111,7 @@ public class KafkaCrossDcConsumerTest {
     // Set necessary configurations
 
     kafkaCrossDcConsumer =
-        new KafkaCrossDcConsumer(conf, new CountDownLatch(0)) {
+        new KafkaCrossDcConsumer(conf, ConsumerMetrics.NOOP, new 
CountDownLatch(0)) {
           @Override
           public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(
               Properties properties) {
@@ -182,7 +182,7 @@ public class KafkaCrossDcConsumerTest {
     KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = 
mock(KafkaConsumer.class);
     KafkaCrossDcConsumer kafkaCrossDcConsumer =
         spy(
-            new KafkaCrossDcConsumer(conf, startLatch) {
+            new KafkaCrossDcConsumer(conf, ConsumerMetrics.NOOP, startLatch) {
               @Override
               public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(
                   Properties properties) {
@@ -457,7 +457,7 @@ public class KafkaCrossDcConsumerTest {
         .handleItem(any());
     KafkaCrossDcConsumer spyConsumer =
         spy(
-            new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+            new KafkaCrossDcConsumer(conf, ConsumerMetrics.NOOP, new 
CountDownLatch(1)) {
               @Override
               public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(
                   Properties properties) {
@@ -551,7 +551,7 @@ public class KafkaCrossDcConsumerTest {
   private KafkaCrossDcConsumer createCrossDcConsumerSpy(
       KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer) {
     return spy(
-        new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+        new KafkaCrossDcConsumer(conf, ConsumerMetrics.NOOP, new 
CountDownLatch(1)) {
           @Override
           public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(
               Properties properties) {
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
index 6110cfbbbad..e7e26d2e9bf 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
@@ -36,6 +36,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.manager.consumer.OtelMetrics;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,7 +56,8 @@ public class SolrMessageProcessorTest {
   public void setUp() {
     client = mock(CloudSolrClient.class);
     resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
-    solrMessageProcessor = new SolrMessageProcessor(() -> client, 
resubmitBackoffPolicy);
+    solrMessageProcessor =
+        new SolrMessageProcessor(mock(OtelMetrics.class), () -> client, 
resubmitBackoffPolicy);
   }
 
   /** Should handle MirroredSolrRequest and return a failed result with no 
retry */
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
index a3c162ce761..769fc58cb29 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
@@ -38,6 +38,8 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
+import org.apache.solr.crossdc.manager.consumer.OtelMetrics;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -70,7 +72,8 @@ public class TestMessageProcessor {
   public void setUp() {
     MockitoAnnotations.initMocks(this);
 
-    processor = Mockito.spy(new SolrMessageProcessor(() -> solrClient, 
backoffPolicy));
+    ConsumerMetrics metrics = Mockito.mock(OtelMetrics.class);
+    processor = Mockito.spy(new SolrMessageProcessor(metrics, () -> 
solrClient, backoffPolicy));
     Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
   }
 
diff --git a/solr/licenses/metrics-healthchecks-4.2.26.jar.sha1 
b/solr/licenses/metrics-healthchecks-4.2.26.jar.sha1
deleted file mode 100644
index 4d1ab22e72a..00000000000
--- a/solr/licenses/metrics-healthchecks-4.2.26.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-50819fda1745b03673eff3bdcddf914999045673
diff --git a/solr/licenses/metrics-jakarta-servlets-4.2.26.jar.sha1 
b/solr/licenses/metrics-jakarta-servlets-4.2.26.jar.sha1
deleted file mode 100644
index 023b4931bb1..00000000000
--- a/solr/licenses/metrics-jakarta-servlets-4.2.26.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-da590186279c40e3187b50ca05ad9a6c4503db33
diff --git a/solr/licenses/metrics-json-4.2.26.jar.sha1 
b/solr/licenses/metrics-json-4.2.26.jar.sha1
deleted file mode 100644
index 1a1b3db2247..00000000000
--- a/solr/licenses/metrics-json-4.2.26.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-1eac3853bb964647b38d7e1d7b66e515443437d6
diff --git a/solr/licenses/metrics-jvm-4.2.26.jar.sha1 
b/solr/licenses/metrics-jvm-4.2.26.jar.sha1
deleted file mode 100644
index b6e5eda9043..00000000000
--- a/solr/licenses/metrics-jvm-4.2.26.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-a7df0386df8c8938dea6cfacdc67972b9fd1a01e
diff --git a/solr/licenses/metrics-jvm-LICENSE-ASL.txt 
b/solr/licenses/metrics-jvm-LICENSE-ASL.txt
deleted file mode 100644
index ccb320c7daa..00000000000
--- a/solr/licenses/metrics-jvm-LICENSE-ASL.txt
+++ /dev/null
@@ -1,203 +0,0 @@
-
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright 2010-2012 Coda Hale and Yammer, Inc.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-
diff --git a/solr/licenses/metrics-jvm-NOTICE.txt 
b/solr/licenses/metrics-jvm-NOTICE.txt
deleted file mode 100644
index b4c6298472f..00000000000
--- a/solr/licenses/metrics-jvm-NOTICE.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-Metrics
-Copyright 2010-2013 Coda Hale and Yammer, Inc.
-
-This product includes software developed by Coda Hale and Yammer, Inc.
-
-This product includes code derived from the JSR-166 project 
(ThreadLocalRandom, Striped64,
-LongAdder), which was released with the following comments:
-
-    Written by Doug Lea with assistance from members of JCP JSR-166
-    Expert Group and released to the public domain, as explained at
-    http://creativecommons.org/publicdomain/zero/1.0/
-
diff --git 
a/solr/licenses/netty-tcnative-boringssl-static-2.0.73.Final.jar.sha1 
b/solr/licenses/netty-tcnative-boringssl-static-2.0.73.Final.jar.sha1
new file mode 100644
index 00000000000..0bea976dc34
--- /dev/null
+++ b/solr/licenses/netty-tcnative-boringssl-static-2.0.73.Final.jar.sha1
@@ -0,0 +1 @@
+de7380a74a7611e9937dd2106abfde5b405dbd15
diff --git a/solr/licenses/netty-tcnative-classes-2.0.73.Final.jar.sha1 
b/solr/licenses/netty-tcnative-classes-2.0.73.Final.jar.sha1
new file mode 100644
index 00000000000..048e8b91eba
--- /dev/null
+++ b/solr/licenses/netty-tcnative-classes-2.0.73.Final.jar.sha1
@@ -0,0 +1 @@
+f50d875a46e4a7768f35dbc26fb796fd791b8b09
diff --git a/solr/licenses/profiler-1.1.1.jar.sha1 
b/solr/licenses/profiler-1.1.1.jar.sha1
deleted file mode 100644
index 329a0deb7d3..00000000000
--- a/solr/licenses/profiler-1.1.1.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-c92a582728b09b47de38b1b97bd2e5b0c8cd553c
diff --git a/solr/licenses/profiler-LICENSE-ASL.txt 
b/solr/licenses/profiler-LICENSE-ASL.txt
deleted file mode 100644
index a6fe25e5c8d..00000000000
--- a/solr/licenses/profiler-LICENSE-ASL.txt
+++ /dev/null
@@ -1,345 +0,0 @@
-
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright [yyyy] [name of copyright owner]
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-
--------------------------------------------------------------------------------
-This project bundles some components that are also licensed under the Apache
-License Version 2.0:
-
-audience-annotations-0.12.0
-caffeine-2.9.3
-commons-beanutils-1.9.4
-commons-cli-1.4
-commons-collections-3.2.2
-commons-digester-2.1
-commons-io-2.11.0
-commons-lang3-3.12.0
-commons-logging-1.2
-commons-validator-1.7
-error_prone_annotations-2.10.0
-jackson-annotations-2.16.2
-jackson-core-2.16.2
-jackson-databind-2.16.2
-jackson-dataformat-csv-2.16.2
-jackson-datatype-jdk8-2.16.2
-jackson-jaxrs-base-2.16.2
-jackson-jaxrs-json-provider-2.16.2
-jackson-module-afterburner-2.16.2
-jackson-module-jaxb-annotations-2.16.2
-jackson-module-scala_2.13-2.16.2
-jackson-module-scala_2.12-2.16.2
-jakarta.validation-api-2.0.2
-javassist-3.29.2-GA
-jetty-client-9.4.54.v20240208
-jetty-continuation-9.4.54.v20240208
-jetty-http-9.4.54.v20240208
-jetty-io-9.4.54.v20240208
-jetty-security-9.4.54.v20240208
-jetty-server-9.4.54.v20240208
-jetty-servlet-9.4.54.v20240208
-jetty-servlets-9.4.54.v20240208
-jetty-util-9.4.54.v20240208
-jetty-util-ajax-9.4.54.v20240208
-jose4j-0.9.4
-lz4-java-1.8.0
-maven-artifact-3.9.6
-metrics-core-4.1.12.1
-metrics-core-2.2.0
-netty-buffer-4.1.111.Final
-netty-codec-4.1.111.Final
-netty-common-4.1.111.Final
-netty-handler-4.1.111.Final
-netty-resolver-4.1.111.Final
-netty-transport-4.1.111.Final
-netty-transport-classes-epoll-4.1.111.Final
-netty-transport-native-epoll-4.1.111.Final
-netty-transport-native-unix-common-4.1.111.Final
-opentelemetry-proto-1.0.0-alpha
-plexus-utils-3.5.1
-reflections-0.10.2
-reload4j-1.2.25
-rocksdbjni-7.9.2
-scala-collection-compat_2.12-2.10.0
-scala-collection-compat_2.13-2.10.0
-scala-library-2.12.19
-scala-library-2.13.14
-scala-logging_2.12-3.9.5
-scala-logging_2.13-3.9.5
-scala-reflect-2.12.19
-scala-reflect-2.13.14
-scala-java8-compat_2.12-1.0.2
-scala-java8-compat_2.13-1.0.2
-snappy-java-1.1.10.5
-swagger-annotations-2.2.8
-zookeeper-3.8.4
-zookeeper-jute-3.8.4
-
-===============================================================================
-This product bundles various third-party components under other open source
-licenses. This section summarizes those components and their licenses.
-See licenses/ for text of these licenses.
-
----------------------------------------
-Eclipse Distribution License - v 1.0
-see: licenses/eclipse-distribution-license-1.0
-
-jakarta.activation-api-1.2.2
-jakarta.xml.bind-api-2.3.3
-
----------------------------------------
-Eclipse Public License - v 2.0
-see: licenses/eclipse-public-license-2.0
-
-jakarta.annotation-api-1.3.5
-jakarta.ws.rs-api-2.1.6
-hk2-api-2.6.1
-hk2-locator-2.6.1
-hk2-utils-2.6.1
-osgi-resource-locator-1.0.3
-aopalliance-repackaged-2.6.1
-jakarta.inject-2.6.1
-jersey-client-2.39.1
-jersey-common-2.39.1
-jersey-container-servlet-2.39.1
-jersey-container-servlet-core-2.39.1
-jersey-hk2-2.39.1
-jersey-server-2.39.1
-
----------------------------------------
-CDDL 1.1 + GPLv2 with classpath exception
-see: licenses/CDDL+GPL-1.1
-
-javax.activation-api-1.2.0
-javax.annotation-api-1.3.2
-javax.servlet-api-3.1.0
-javax.ws.rs-api-2.1.1
-jaxb-api-2.3.1
-activation-1.1.1
-
----------------------------------------
-MIT License
-
-argparse4j-0.7.0, see: licenses/argparse-MIT
-checker-qual-3.19.0, see: licenses/checker-qual-MIT
-jopt-simple-5.0.4, see: licenses/jopt-simple-MIT
-slf4j-api-1.7.36, see: licenses/slf4j-MIT
-slf4j-reload4j-1.7.36, see: licenses/slf4j-MIT
-pcollections-4.0.1, see: licenses/pcollections-MIT
-
----------------------------------------
-BSD 2-Clause
-
-zstd-jni-1.5.6-3 see: licenses/zstd-jni-BSD-2-clause
-
----------------------------------------
-BSD 3-Clause
-
-jline-3.25.1, see: licenses/jline-BSD-3-clause
-jsr305-3.0.2, see: licenses/jsr305-BSD-3-clause
-paranamer-2.8, see: licenses/paranamer-BSD-3-clause
-protobuf-java-3.23.4, see: licenses/protobuf-java-BSD-3-clause
-
----------------------------------------
-Do What The F*ck You Want To Public License
-see: licenses/DWTFYWTPL
-
-reflections-0.10.2
\ No newline at end of file
diff --git a/solr/licenses/profiler-NOTICE.txt 
b/solr/licenses/profiler-NOTICE.txt
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
index 384cd6ef998..2201d1ca633 100644
--- 
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
+++ 
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.update.processor;
 
 import java.lang.invoke.MethodHandles;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroringException;
@@ -51,7 +52,8 @@ public class KafkaRequestMirroringHandler implements 
RequestMirroringHandler {
     }
     // TODO: Enforce external version constraint for consistent update 
replication (cross-cluster)
     final MirroredSolrRequest<?> mirroredRequest =
-        new MirroredSolrRequest<>(MirroredSolrRequest.Type.UPDATE, 1, request, 
System.nanoTime());
+        new MirroredSolrRequest<>(
+            MirroredSolrRequest.Type.UPDATE, 1, request, 
TimeSource.CURRENT_TIME.getTimeNs());
     try {
       sink.submit(mirroredRequest);
     } catch (MirroringException exception) {

Reply via email to