This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 9ad786fc53a SOLR-18077: CrossDC Consumer - out-of-order Kafka
partition processing (#4125)
9ad786fc53a is described below
commit 9ad786fc53ab3a9688bec48785d6c5bf09637cc3
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Fri Feb 20 17:11:01 2026 +0100
SOLR-18077: CrossDC Consumer - out-of-order Kafka partition processing
(#4125)
---
changelog/unreleased/solr-18077.yml | 9 +++++
.../manager/SolrAndKafkaIntegrationTest.java | 41 +++++++++++++++++++++-
.../solr/crossdc/common/KafkaMirroringSink.java | 5 ++-
.../pages/cross-dc-replication.adoc | 8 +++--
4 files changed, 59 insertions(+), 4 deletions(-)
diff --git a/changelog/unreleased/solr-18077.yml
b/changelog/unreleased/solr-18077.yml
new file mode 100644
index 00000000000..a4747905be7
--- /dev/null
+++ b/changelog/unreleased/solr-18077.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: "CrossDC Consumer: fix potential out-of-order Kafka partition
processing"
+type: fixed # added, changed, fixed, deprecated, removed, dependency_update,
security, other
+authors:
+ - name: Andrzej Bialecki
+ nick: ab
+links:
+ - name: SOLR-18077
+ url: https://issues.apache.org/jira/browse/SOLR-18077
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 39970d7b4ca..e30ff999a0b 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -33,6 +34,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -122,8 +124,10 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
};
kafkaCluster.start();
- kafkaCluster.createTopic(TOPIC, 1, 1);
+ kafkaCluster.createTopic(TOPIC, 10, 1);
+ // ensure small batches to test multi-partition ordering
+ System.setProperty("batchSizeBytes", "128");
System.setProperty("solr.crossdc.topicName", TOPIC);
System.setProperty("solr.crossdc.bootstrapServers",
kafkaCluster.bootstrapServers());
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
@@ -183,6 +187,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
Thread.setDefaultUncaughtExceptionHandler(uceh);
}
+ @Test
public void testFullCloudToCloud() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient(COLLECTION);
SolrInputDocument doc = new SolrInputDocument();
@@ -198,6 +203,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
}
+ @Test
public void testProducerToCloud() throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
@@ -228,6 +234,39 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
producer.close();
}
+ private static final String LOREM_IPSUM =
+ "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam,
quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non
proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
+
+ @Test
+ public void testStrictOrdering() throws Exception {
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ int NUM_DOCS = 5000;
+ // delay deletes by this many docs
+ int DELTA = 100;
+ for (int i = 0; i < NUM_DOCS; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "id-" + i);
+ doc.addField("text", "some test with a relatively long field. " +
LOREM_IPSUM);
+
+ client.add(COLLECTION, doc);
+ if (i >= DELTA) {
+ client.deleteById(COLLECTION, "id-" + (i - DELTA));
+ }
+ }
+
+ // send the remaining deletes in random order
+ ArrayList<Integer> ids = new ArrayList<>(DELTA);
+ IntStream.range(0, DELTA).forEach(i -> ids.add(i));
+ Collections.shuffle(ids, random());
+ for (Integer id : ids) {
+ client.deleteById(COLLECTION, "id-" + (NUM_DOCS - DELTA + id));
+ }
+
+ client.commit(COLLECTION);
+
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 0);
+ }
+
@Test
@Ignore("This relies on collection properties and I don't see where they are
read anymore")
public void testMirroringUpdateProcessor() throws Exception {
diff --git
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 1a8ad622c0b..70f48457814 100644
---
a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++
b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -92,12 +92,15 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
}
final long enqueueStartNanos = System.nanoTime();
+ // required for multi-partition topics to preserve ordering of requests
for a collection
+ final String recordKey =
+ request.getSolrRequest() != null ?
request.getSolrRequest().getCollection() : null;
// Create Producer record
try {
producer.send(
- new ProducerRecord<>(topicName, request),
+ new ProducerRecord<>(topicName, recordKey, request),
(metadata, exception) -> {
if (exception != null) {
log.error(
diff --git
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
index bb235dcdc7c..07e1b52ecc1 100644
---
a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
+++
b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc
@@ -130,7 +130,7 @@ Optional configuration properties:
`solr.crossdc.retryBackoffMs` _<integer>_:: The amount of time to wait before
attempting to retry a failed request to a given topic partition.
`solr.crossdc.deliveryTimeoutMS` _<integer>_:: Updates sent to the Kafka queue
will be failed before the number of retries has been exhausted if the timeout
configured by delivery.timeout.ms expires first
`solr.crossdc.maxRequestSizeBytes` _<integer>_:: The maximum size of a Kafka
queue request in bytes - limits the number of requests that will be sent over
the queue in a single batch.
-`solr.crossdc.dlqTopicName` _<string>_: If not empty then requests that failed
processing `maxAttempts` times will be sent to a "dead letter queue" topic in
Kafka (must exist if configured).
+`solr.crossdc.dlqTopicName` _<string>_:: If not empty then requests that
failed processing `maxAttempts` times will be sent to a "dead letter queue"
topic in Kafka (must exist if configured).
`solr.crossdc.mirrorCommits` _<boolean>_:: If `true` then standalone commit
requests will be mirrored, otherwise they will be processed only locally.
`solr.crossdc.expandDbq` _<enum>_ :: If set to `expand` (default) then
Delete-By-Query will be expanded before mirroring into series of Delete-By-Id,
which may help with correct processing of out-of-order requests on the consumer
side.
If set to `none` then Delete-By-Query requests will be mirrored as-is.
@@ -212,4 +212,8 @@ Setting the `solr.crossdc.enabled` system property or
xref:collection-management
- When `solr.crossdc.expandDbq` property is set to `expand` (default) then
Delete-By-Query converts to a series of Delete-By-Id, which can be much less
efficient for queries matching large numbers of documents.
Setting this property to `none` results in forwarding a real Delete-By-Query -
this reduces the amount of data to mirror but may cause different results due
to the potential re-ordering of failed & re-submitted requests between Consumer
and the target Solr.
- When `solr.crossdc.collapseUpdates` is set to `all` then multiple requests
containing a mix of add and delete ops will be collapsed into a single outgoing
request.
-This will cause the original ordering of add / delete ops to be lost (because
Solr processing of an update request always processes all add ops first, and
only then the delete ops), which may affect the final outcome when some of the
ops refer to the same document ids.
\ No newline at end of file
+This will cause the original ordering of add / delete ops to be lost (because
Solr processing of an update request always processes all add ops first, and
only then the delete ops), which may affect the final outcome when some of the
ops refer to the same document ids.
+- When the Kafka topic used for mirroring has multiple partitions the CrossDC
Producer and Consumer guarantee strict ordering of updates ONLY within the same
collection.
+In other words, when a multi-partition topic is used for mirroring there's no
guarantee of a strict global request ordering across
+collections, which normally should not be an issue. However, if a strict
global ordering across collections is required then
+the mirroring topic must use a single partition.