This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 50bc6ff916a SOLR-18064: CrossDC Producer - add more detailed metrics.
(#4106)
50bc6ff916a is described below
commit 50bc6ff916a858e40167de31b34586f623a15320
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Fri Feb 20 12:29:03 2026 +0100
SOLR-18064: CrossDC Producer - add more detailed metrics. (#4106)
---
changelog/unreleased/solr-18064.yml | 9 ++
.../update/processor/MirroringUpdateProcessor.java | 21 +++-
.../crossdc/update/processor/ProducerMetrics.java | 100 +++++++++++++++
.../processor/MirroringUpdateProcessorTest.java | 134 +++++++++++++++++++--
4 files changed, 254 insertions(+), 10 deletions(-)
diff --git a/changelog/unreleased/solr-18064.yml
b/changelog/unreleased/solr-18064.yml
new file mode 100644
index 00000000000..9684c482e7e
--- /dev/null
+++ b/changelog/unreleased/solr-18064.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: CrossDC Producer - add more detailed metrics.
+type: changed # added, changed, fixed, deprecated, removed, dependency_update,
security, other
+authors:
+ - name: Andrzej Bialecki
+ nick: ab
+links:
+ - name: SOLR-18064
+ url: https://issues.apache.org/jira/browse/SOLR-18064
diff --git
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
index 05ec9217c10..3d10b3beb51 100644
---
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
+++
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
@@ -146,6 +146,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
producerMetrics.getDocumentSize().record(estimatedDocSizeInBytes);
final boolean tooLargeForKafka = estimatedDocSizeInBytes >
maxMirroringDocSizeBytes;
if (tooLargeForKafka && !indexUnmirrorableDocs) {
+ producerMetrics.getDocumentTooLarge().inc();
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"Update exceeds the doc-size limit and is unmirrorable. id="
@@ -181,9 +182,11 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
try {
requestMirroringHandler.mirror(mirrorRequest);
producerMetrics.getSubmitted().inc();
+ producerMetrics.getSubmittedAdd().inc();
} catch (Exception e) {
log.error("mirror submit failed", e);
producerMetrics.getSubmitError().inc();
+ producerMetrics.getSubmittedAddError().inc();
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
@@ -250,7 +253,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
return;
}
super.processDelete(cmd); // let this throw to prevent mirroring invalid
requests
-
+ producerMetrics.getLocal().inc();
if (doMirroring) {
boolean isLeader = false;
UpdateRequest mirrorRequest = createMirrorRequest();
@@ -271,8 +274,11 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
try {
requestMirroringHandler.mirror(mirrorRequest);
+ producerMetrics.getSubmitted().inc();
+ producerMetrics.getSubmittedDeleteById().inc();
} catch (Exception e) {
log.error("mirror submit failed", e);
+ producerMetrics.getSubmittedDeleteByIdError().inc();
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
@@ -289,8 +295,12 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
try {
requestMirroringHandler.mirror(mirrorRequest);
+ producerMetrics.getSubmitted().inc();
+ producerMetrics.getSubmittedDeleteByQuery().inc();
} catch (Exception e) {
log.error("mirror submit failed", e);
+ producerMetrics.getSubmitError().inc();
+ producerMetrics.getSubmittedDeleteByQueryError().inc();
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
@@ -390,7 +400,10 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
log.debug("process commit cmd={}", cmd);
- if (next != null) next.processCommit(cmd);
+ if (next != null) {
+ next.processCommit(cmd);
+ producerMetrics.getLocal().inc();
+ }
if (!mirrorCommits) {
return;
}
@@ -424,8 +437,12 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
log.debug(" --doMirroring commit req={}", req);
try {
requestMirroringHandler.mirror(req);
+ producerMetrics.getSubmitted().inc();
+ producerMetrics.getSubmittedCommit().inc();
} catch (Exception e) {
log.error("mirror submit failed", e);
+ producerMetrics.getSubmitError().inc();
+ producerMetrics.getSubmittedCommitError().inc();
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
diff --git
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/ProducerMetrics.java
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/ProducerMetrics.java
index 65fd30668a8..73e52a62ff9 100644
---
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/ProducerMetrics.java
+++
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/ProducerMetrics.java
@@ -31,6 +31,14 @@ public class ProducerMetrics {
private final AttributedLongCounter localError;
private final AttributedLongCounter submitted;
private final AttributedLongCounter submitError;
+ private final AttributedLongCounter submittedAdd;
+ private final AttributedLongCounter submittedAddError;
+ private final AttributedLongCounter submittedDeleteById;
+ private final AttributedLongCounter submittedDeleteByIdError;
+ private final AttributedLongCounter submittedDeleteByQuery;
+ private final AttributedLongCounter submittedDeleteByQueryError;
+ private final AttributedLongCounter submittedCommit;
+ private final AttributedLongCounter submittedCommitError;
private final AttributedLongHistogram documentSize;
private final AttributedLongCounter documentTooLarge;
@@ -45,6 +53,22 @@ public class ProducerMetrics {
solrMetricsContext.longCounter(
"solr_core_crossdc_producer_submitted",
"The number of documents submitted to the Kafka topic (success or
error)");
+ var localSubmittedAdd =
+ solrMetricsContext.longCounter(
+ "solr_core_crossdc_producer_submitted_add",
+ "The number of add requests submitted to the Kafka topic (success
or error)");
+ var localSubmittedDbi =
+ solrMetricsContext.longCounter(
+ "solr_core_crossdc_producer_submitted_delete_by_id",
+ "The number of Delete-By-Id requests submitted to the Kafka topic
(success or error)");
+ var localSubmittedDbq =
+ solrMetricsContext.longCounter(
+ "solr_core_crossdc_producer_submitted_delete_by_query",
+ "The number of Delete-By-Query requests submitted to the Kafka
topic (success or error)");
+ var localSubmittedCommit =
+ solrMetricsContext.longCounter(
+ "solr_core_crossdc_producer_submitted_commit",
+ "The number of standalone Commit requests submitted to the Kafka
topic (success or error)");
var histogramDocSizes =
solrMetricsContext.longHistogram(
"solr_core_crossdc_producer_document_size",
@@ -67,6 +91,30 @@ public class ProducerMetrics {
this.submitError =
new AttributedLongCounter(
localSubmitted, attributes.toBuilder().put(TYPE_ATTR,
"error").build());
+ this.submittedAdd =
+ new AttributedLongCounter(
+ localSubmittedAdd, attributes.toBuilder().put(TYPE_ATTR,
"success").build());
+ this.submittedAddError =
+ new AttributedLongCounter(
+ localSubmittedAdd, attributes.toBuilder().put(TYPE_ATTR,
"error").build());
+ this.submittedDeleteById =
+ new AttributedLongCounter(
+ localSubmittedDbi, attributes.toBuilder().put(TYPE_ATTR,
"success").build());
+ this.submittedDeleteByIdError =
+ new AttributedLongCounter(
+ localSubmittedDbi, attributes.toBuilder().put(TYPE_ATTR,
"error").build());
+ this.submittedDeleteByQuery =
+ new AttributedLongCounter(
+ localSubmittedDbq, attributes.toBuilder().put(TYPE_ATTR,
"success").build());
+ this.submittedDeleteByQueryError =
+ new AttributedLongCounter(
+ localSubmittedDbq, attributes.toBuilder().put(TYPE_ATTR,
"error").build());
+ this.submittedCommit =
+ new AttributedLongCounter(
+ localSubmittedCommit, attributes.toBuilder().put(TYPE_ATTR,
"success").build());
+ this.submittedCommitError =
+ new AttributedLongCounter(
+ localSubmittedCommit, attributes.toBuilder().put(TYPE_ATTR,
"error").build());
this.documentSize = new AttributedLongHistogram(histogramDocSizes,
attributes);
this.documentTooLarge = new AttributedLongCounter(tooLargeErrors,
attributes);
}
@@ -94,6 +142,58 @@ public class ProducerMetrics {
return this.submitError;
}
+ /** Counter representing the number of add requests submitted to the Kafka
topic. */
+ public AttributedLongCounter getSubmittedAdd() {
+ return this.submittedAdd;
+ }
+
+ /**
+ * Counter representing the number of add requests that were not submitted
to the Kafka topic
+ * because of exception during execution.
+ */
+ public AttributedLongCounter getSubmittedAddError() {
+ return this.submittedAddError;
+ }
+
+ /** Counter representing the number of delete-by-id requests submitted to
the Kafka topic. */
+ public AttributedLongCounter getSubmittedDeleteById() {
+ return this.submittedDeleteById;
+ }
+
+ /**
+ * Counter representing the number of delete-by-id requests that were not
submitted to the Kafka
+ * topic because of exception during execution.
+ */
+ public AttributedLongCounter getSubmittedDeleteByIdError() {
+ return this.submittedDeleteByIdError;
+ }
+
+ /** Counter representing the number of delete-by-query requests submitted to
the Kafka topic. */
+ public AttributedLongCounter getSubmittedDeleteByQuery() {
+ return this.submittedDeleteByQuery;
+ }
+
+ /**
+ * Counter representing the number of delete-by-query requests that were not
submitted to the
+ * Kafka topic because of exception during execution.
+ */
+ public AttributedLongCounter getSubmittedDeleteByQueryError() {
+ return this.submittedDeleteByQueryError;
+ }
+
+ /** Counter representing the number of standalone Commit requests submitted
to the Kafka topic. */
+ public AttributedLongCounter getSubmittedCommit() {
+ return this.submittedCommit;
+ }
+
+ /**
+ * Counter representing the number of standalone Commit requests that were
not submitted to the
+ * Kafka topic because of exception during execution.
+ */
+ public AttributedLongCounter getSubmittedCommitError() {
+ return this.submittedCommitError;
+ }
+
/** Histogram of the processed document size. */
public AttributedLongHistogram getDocumentSize() {
return this.documentSize;
diff --git
a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessorTest.java
b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessorTest.java
index 82cab81992b..b8e01a62efa 100644
---
a/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessorTest.java
+++
b/solr/modules/cross-dc/src/test/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessorTest.java
@@ -17,6 +17,8 @@
package org.apache.solr.crossdc.update.processor;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -26,6 +28,8 @@ import static org.mockito.Mockito.when;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.SolrQuery;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -77,6 +81,7 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
private ZkStateReader zkStateReader;
private Replica replica;
private ProducerMetrics producerMetrics;
+ private Map<String, AtomicLong> counters = new ConcurrentHashMap<>();
@BeforeClass
public static void ensureWorkingMockito() {
@@ -91,6 +96,8 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
req = mock(SolrQueryRequestBase.class);
when(req.getParams()).thenReturn(new ModifiableSolrParams());
+ counters.clear();
+
requestMock = mock(UpdateRequest.class);
addUpdateCommand = new AddUpdateCommand(req);
@@ -120,31 +127,98 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
producerMetrics =
spy(
new ProducerMetrics(mock(SolrMetricsContext.class), core) {
- private final AttributedLongCounter counterMock =
mock(AttributedLongCounter.class);
+ Map<String, AttributedLongCounter> counterMap = new
ConcurrentHashMap<>();
+
+ AttributedLongCounter getMockCounter(String label) {
+ return counterMap.computeIfAbsent(
+ label,
+ k -> {
+ AttributedLongCounter mockCounter =
mock(AttributedLongCounter.class);
+ doAnswer(
+ inv -> {
+ counters
+ .computeIfAbsent(k, k2 -> new AtomicLong())
+ .addAndGet(inv.getArgument(0));
+ return null;
+ })
+ .when(mockCounter)
+ .add(anyLong());
+ doAnswer(
+ inv -> {
+ counters
+ .computeIfAbsent(k, k2 -> new AtomicLong())
+ .incrementAndGet();
+ return null;
+ })
+ .when(mockCounter)
+ .inc();
+ return mockCounter;
+ });
+ }
@Override
public AttributedLongCounter getLocal() {
- return counterMock;
+ return getMockCounter("local");
}
@Override
public AttributedLongCounter getLocalError() {
- return counterMock;
+ return getMockCounter("localError");
}
@Override
public AttributedLongCounter getSubmitted() {
- return counterMock;
+ return getMockCounter("submitted");
}
@Override
public AttributedLongCounter getDocumentTooLarge() {
- return counterMock;
+ return getMockCounter("documentTooLarge");
}
@Override
public AttributedLongCounter getSubmitError() {
- return counterMock;
+ return getMockCounter("submitError");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedAdd() {
+ return getMockCounter("submittedAdd");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedAddError() {
+ return getMockCounter("submittedAddError");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedDeleteById() {
+ return getMockCounter("submittedDeleteById");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedDeleteByIdError() {
+ return getMockCounter("submittedDeleteByIdError");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedDeleteByQuery() {
+ return getMockCounter("submittedDeleteByQuery");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedDeleteByQueryError() {
+ return getMockCounter("submittedDeleteByQueryError");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedCommit() {
+ return getMockCounter("submittedCommit");
+ }
+
+ @Override
+ public AttributedLongCounter getSubmittedCommitError() {
+ return getMockCounter("submittedCommitError");
}
@Override
@@ -209,6 +283,10 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
try {
processor.processDelete(deleteUpdateCommand);
verify(requestMirroringHandler, times(1)).mirror(requestMock);
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedDeleteByQuery").get());
+ assertNull(counters.get("submittedDeleteById"));
} catch (Exception e) {
fail("IOException should not be thrown");
}
@@ -224,6 +302,9 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processAdd(addUpdateCommand);
verify(requestMirroringHandler, times(1)).mirror(requestMock);
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedAdd").get());
} catch (IOException e) {
fail("IOException should not be thrown");
} catch (Exception e) {
@@ -242,6 +323,11 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
deleteUpdateCommand.setId("test");
processor.processDelete(deleteUpdateCommand);
verify(requestMirroringHandler, times(1)).mirror(requestMock);
+ // this is somewhat counter-intuitive, but the deleteById is IGNORED
when query is set.
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedDeleteByQuery").get());
+ assertNull(counters.get("submittedDeleteById"));
} catch (Exception e) {
fail("IOException should not be thrown");
}
@@ -255,6 +341,8 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
processor.processCommit(commitUpdateCommand);
verify(next).processCommit(commitUpdateCommand);
verify(requestMirroringHandler, times(0)).mirror(requestMock);
+ assertEquals(1, counters.get("local").get());
+ assertNull(counters.get("submitted"));
} catch (Exception e) {
fail("IOException should not be thrown: " + e);
}
@@ -263,12 +351,14 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
@Test
public void processCommitOnlyNonLeader() {
try {
- // should skip if processing on non-leader replica
+ // should skip mirroring if processing on non-leader replica
when(replica.getName()).thenReturn("foobar");
when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processCommit(commitUpdateCommand);
verify(next).processCommit(commitUpdateCommand);
verify(requestMirroringHandler, times(0)).mirror(requestMock);
+ assertEquals(1, counters.get("local").get());
+ assertNull(counters.get("submitted"));
} catch (Exception e) {
fail("IOException should not be thrown: " + e);
}
@@ -295,6 +385,9 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
processor.processCommit(commitUpdateCommand);
verify(next).processCommit(commitUpdateCommand);
verify(requestMirroringHandler, times(1)).mirror(captor.capture());
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedCommit").get());
UpdateRequest req = captor.getValue();
assertNotNull(req.getParams());
SolrParams params = req.getParams();
@@ -331,6 +424,8 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
processor.processCommit(commitUpdateCommand);
verify(next).processCommit(commitUpdateCommand);
verify(requestMirroringHandler, times(0)).mirror(requestMock);
+ assertEquals(1, counters.get("local").get());
+ assertNull(counters.get("submitted"));
} catch (Exception e) {
fail("Exception should not be thrown: " + e);
}
@@ -348,6 +443,9 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
processor.processAdd(cmd);
verify(next).processAdd(cmd);
verify(requestMirroringHandler).mirror(requestMock);
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedAdd").get());
}
@Test
@@ -384,6 +482,7 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
assertThrows(
SolrException.class, () ->
mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
+ assertEquals(1, counters.get("documentTooLarge").get());
}
@Test
@@ -391,6 +490,9 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processAdd(addUpdateCommand);
verify(requestMirroringHandler, times(1)).mirror(any());
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedAdd").get());
}
@Test
@@ -398,6 +500,9 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
processor.processAdd(addUpdateCommand);
verify(requestMirroringHandler, times(0)).mirror(any());
+ assertEquals(1, counters.get("local").get());
+ assertNull(counters.get("submitted"));
+ assertNull(counters.get("submittedAdd"));
}
@Test
@@ -405,6 +510,9 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
processor.processDelete(deleteUpdateCommand);
verify(requestMirroringHandler, times(1)).mirror(any());
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedDeleteByQuery").get());
}
@Test
@@ -434,6 +542,12 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
verify(requestMirroringHandler, times(1)).mirror(updateRequest);
assertEquals("missing dbq", 1, updateRequest.getDeleteQuery().size());
assertEquals("dbq value", "id:test*",
updateRequest.getDeleteQuery().get(0));
+ // verify the metrics
+ assertEquals(1, counters.get("local").get());
+ assertEquals(1, counters.get("submittedDeleteByQuery").get());
+ assertEquals(1, counters.get("submitted").get());
+ // no expansion
+ assertNull(counters.get("submittedDeleteById"));
}
@Test
@@ -445,11 +559,15 @@ public class MirroringUpdateProcessorTest extends
SolrTestCaseJ4 {
processor.processAdd(addUpdateCommand);
SolrQuery query = new SolrQuery();
- query.setQuery("*:*");
+ query.setQuery("id:*");
query.setRows(1000);
query.setSort("id", SolrQuery.ORDER.asc);
processor.processDelete(deleteUpdateCommand);
+ assertEquals(2, counters.get("local").get());
+ assertEquals(2, counters.get("submitted").get());
+ assertEquals(1, counters.get("submittedAdd").get());
+ assertEquals(1, counters.get("submittedDeleteByQuery").get());
}
@Test