This is an automated email from the ASF dual-hosted git repository.

sigram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new a2370b58380 SOLR-18077: Fix incorrect request reuse, fix unit tests. 
(#4396)
a2370b58380 is described below

commit a2370b58380e61633e0e1b65312bb33201eea3cd
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Wed May 6 08:36:59 2026 +0200

    SOLR-18077: Fix incorrect request reuse, fix unit tests. (#4396)
---
 solr/cross-dc-manager/build.gradle                 |   1 +
 .../manager/consumer/KafkaCrossDcConsumer.java     |  30 +++-
 .../crossdc/manager/consumer/PartitionManager.java |  14 +-
 .../manager/SolrAndKafkaIntegrationTest.java       | 161 +++++++++++++++++++--
 4 files changed, 188 insertions(+), 18 deletions(-)

diff --git a/solr/cross-dc-manager/build.gradle 
b/solr/cross-dc-manager/build.gradle
index 4ce538f67f7..14ed70a5dbc 100644
--- a/solr/cross-dc-manager/build.gradle
+++ b/solr/cross-dc-manager/build.gradle
@@ -35,6 +35,7 @@ dependencies {
   implementation libs.opentelemetry.sdk.metrics
   implementation libs.eclipse.jetty.server
   implementation libs.eclipse.jetty.ee10.servlet
+  implementation libs.google.guava
   implementation libs.jakarta.servlet.api
   implementation libs.slf4j.api
   runtimeOnly libs.google.protobuf.javautils
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index c3b5bdb3a70..d368151c6af 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -75,6 +75,8 @@ import org.slf4j.LoggerFactory;
 public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static final String PROP_TOPIC_DEBUG = 
"solr.crossdc.consumer.topic.debug";
+
   private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
   private final AdminClient adminClient;
   private final CountDownLatch startLatch;
@@ -101,6 +103,8 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private volatile boolean running = false;
 
+  private boolean topicDebug = 
Boolean.parseBoolean(System.getProperty(PROP_TOPIC_DEBUG, "false"));
+
   /**
    * Supplier for creating and managing a working CloudSolrClient instance. 
This class ensures that
    * the CloudSolrClient instance doesn't try to use its {@link
@@ -175,6 +179,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
             conf.get(CrossDcConf.COLLAPSE_UPDATES), 
CrossDcConf.CollapseUpdates.PARTIAL);
     this.maxCollapseRecords = 
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
     this.startLatch = startLatch;
+
     final Properties kafkaConsumerProps = new Properties();
 
     kafkaConsumerProps.put(
@@ -375,6 +380,9 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
       ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord = null;
 
       for (TopicPartition partition : records.partitions()) {
+        if (log.isTraceEnabled()) {
+          log.trace("Checking partition {}", partition.partition());
+        }
         List<ConsumerRecord<String, MirroredSolrRequest<?>>> partitionRecords =
             records.records(partition);
 
@@ -396,19 +404,31 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
             metrics.incrementInputMsgCounter();
             lastRecord = requestRecord;
-            MirroredSolrRequest<?> req = requestRecord.value();
-            SolrRequest<?> solrReq = req.getSolrRequest();
-            MirroredSolrRequest.Type type = req.getType();
+            final MirroredSolrRequest<?> req = requestRecord.value();
+            final SolrRequest<?> solrReq = req.getSolrRequest();
+            final MirroredSolrRequest.Type type = req.getType();
 
             if (type != MirroredSolrRequest.Type.UPDATE) {
               String action = solrReq.getParams().get("action", "unknown");
               metrics.incrementInputReqCounter(type.name(), action);
             }
 
-            ModifiableSolrParams params = new 
ModifiableSolrParams(solrReq.getParams());
+            final ModifiableSolrParams params = new 
ModifiableSolrParams(solrReq.getParams());
             if (log.isTraceEnabled()) {
               log.trace("-- picked type={}, params={}", req.getType(), params);
             }
+            if (topicDebug) {
+              solrReq.addHeader("topic.debug", "true");
+              solrReq.addHeader("record.topic", requestRecord.topic());
+              solrReq.addHeader("record.partition", 
String.valueOf(requestRecord.partition()));
+              solrReq.addHeader("record.offset", 
String.valueOf(requestRecord.offset()));
+              solrReq.addHeader("record.timestamp", 
String.valueOf(requestRecord.timestamp()));
+              solrReq.addHeader("record.key", requestRecord.key());
+              solrReq.addHeader("workUnit.nextOffset", 
String.valueOf(workUnit.nextOffset));
+              solrReq.addHeader("workUnit.partition", 
String.valueOf(workUnit.partition));
+              solrReq.addHeader("workUnit.topic", workUnit.topic);
+              solrReq.addHeader("workUnit.items", 
String.valueOf(workUnit.workItems.size()));
+            }
 
             // determine if it's an UPDATE with deletes, or if the existing 
batch has deletes
             boolean hasDeletes = false;
@@ -450,6 +470,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
               if (updateReqBatch == null) {
                 // just initialize
                 updateReqBatch = new UpdateRequest();
+                updateReqBatch.addHeaders(solrReq.getHeaders());
               } else {
                 if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
                   throw new RuntimeException("Can't collapse requests.");
@@ -490,6 +511,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
           if (updateReqBatch != null) {
             sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, 
lastRecord, workUnit);
+            updateReqBatch = null;
           }
           try {
             partitionManager.checkForOffsetUpdates(partition);
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
index c1004528ab6..c93740f25ea 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.crossdc.manager.consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayDeque;
 import java.util.HashSet;
@@ -44,13 +45,16 @@ public class PartitionManager {
     final Queue<WorkUnit> partitionQueue = new ArrayDeque<>();
   }
 
-  static class WorkUnit {
-    final TopicPartition partition;
-    Set<Future<?>> workItems = new HashSet<>();
+  @VisibleForTesting
+  public static class WorkUnit {
+    final int partition;
+    final String topic;
+    final Set<Future<?>> workItems = new HashSet<>();
     long nextOffset;
 
-    public WorkUnit(TopicPartition partition) {
-      this.partition = partition;
+    WorkUnit(TopicPartition partition) {
+      this.partition = partition.partition();
+      this.topic = partition.topic();
     }
   }
 
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index e837be5fec3..73b6bd8abd5 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.solr.crossdc.manager;
 
+import static org.apache.solr.crossdc.common.CrossDcConf.COLLAPSE_UPDATES;
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BATCH_SIZE_BYTES;
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.BOOTSTRAP_SERVERS;
 import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
 import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
@@ -28,14 +32,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,6 +53,7 @@ import org.apache.lucene.tests.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -56,14 +65,19 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.manager.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
+import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
+import org.apache.solr.crossdc.manager.consumer.PartitionManager;
 import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
 import org.junit.After;
 import org.junit.Before;
@@ -90,11 +104,64 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
   private static final int NUM_BROKERS = 1;
   public EmbeddedKafkaCluster kafkaCluster;
 
+  private static class ConsumerBatch {
+    final String kafkaTopic;
+    final int partitionId;
+    final MirroredSolrRequest.Type type;
+    final String collection;
+    final Map<String, String> headers;
+    final Set<String> addIds = new HashSet<>();
+    final String json;
+
+    public ConsumerBatch(final MirroredSolrRequest.Type type, final 
SolrRequest<?> solrRequest) {
+      this.kafkaTopic = solrRequest.getHeaders().get("record.topic");
+      this.partitionId = 
Integer.parseInt(solrRequest.getHeaders().get("record.partition"));
+      this.type = type;
+      this.collection = solrRequest.getCollection();
+      this.headers = solrRequest.getHeaders();
+      if (solrRequest instanceof UpdateRequest) {
+        UpdateRequest updateReq = (UpdateRequest) solrRequest;
+        json =
+            Utils.toJSONString(
+                Map.of("params", updateReq.getParams(), "add", 
updateReq.getDocuments()));
+        updateReq.getDocuments().forEach(doc -> 
addIds.add(doc.getFieldValue("id").toString()));
+      } else {
+        json =
+            Utils.toJSONString(
+                Map.of("params", solrRequest.getParams(), "class", 
solrRequest.getClass()));
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "ConsumerBatch{"
+          + "kafkaTopic='"
+          + kafkaTopic
+          + '\''
+          + ", partitionId="
+          + partitionId
+          + ", type="
+          + type
+          + ", collection='"
+          + collection
+          + '\''
+          + ", headers="
+          + headers
+          + '\''
+          + ", json='"
+          + json
+          + '\''
+          + '}';
+    }
+  }
+
   protected volatile MiniSolrCloudCluster solrCluster1;
   protected volatile MiniSolrCloudCluster solrCluster2;
 
   protected volatile Consumer consumer;
 
+  private List<ConsumerBatch> consumerBatches;
+
   private static final String TOPIC = "topic1";
 
   private static final String COLLECTION = "collection1";
@@ -112,7 +179,28 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     Thread.setDefaultUncaughtExceptionHandler(
         (t, e) -> log.error("Uncaught exception in thread {}", t, e));
     System.setProperty("otel.metrics.exporter", "prometheus");
-    consumer = new Consumer();
+    System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
+    consumerBatches = new ArrayList<>();
+    consumer =
+        new Consumer() {
+          @Override
+          protected CrossDcConsumer getCrossDcConsumer(
+              final KafkaCrossDcConf conf,
+              final ConsumerMetrics metrics,
+              final CountDownLatch startLatch) {
+            return new KafkaCrossDcConsumer(conf, metrics, startLatch) {
+              @Override
+              public void sendBatch(
+                  final SolrRequest<? extends SolrResponse> solrReqBatch,
+                  final MirroredSolrRequest.Type type,
+                  final ConsumerRecord<String, MirroredSolrRequest<?>> 
lastRecord,
+                  final PartitionManager.WorkUnit workUnit) {
+                consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
+                super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
+              }
+            };
+          }
+        };
     Properties config = new Properties();
 
     kafkaCluster =
@@ -124,13 +212,15 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
         };
     kafkaCluster.start();
 
-    kafkaCluster.createTopic(TOPIC, 10, 1);
+    // create many partitions to test for re-ordered reads
+    kafkaCluster.createTopic(TOPIC, 3, 1);
 
     // ensure small batches to test multi-partition ordering
-    System.setProperty("batchSizeBytes", "128");
-    System.setProperty("solr.crossdc.topicName", TOPIC);
-    System.setProperty("solr.crossdc.bootstrapServers", 
kafkaCluster.bootstrapServers());
+    System.setProperty(BATCH_SIZE_BYTES, "100");
+    System.setProperty(TOPIC_NAME, TOPIC);
+    System.setProperty(BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
     System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
+    System.setProperty(COLLAPSE_UPDATES, "none");
 
     solrCluster1 =
         configureCluster(1).addConfig("conf", 
getFile("configs/cloud-minimal/conf")).configure();
@@ -238,10 +328,62 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
       "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod 
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, 
quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo 
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse 
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non 
proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
 
   @Test
-  @Ignore("SOLR-18077")
+  public void testPartitioning() throws Exception {
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1);
+    create.process(solrCluster1.getSolrClient());
+    create.process(solrCluster2.getSolrClient());
+    solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+    solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    int NUM_DOCS = 200;
+    for (int i = 0; i < NUM_DOCS; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "id-" + i);
+      doc.addField("id_i", i);
+      doc.addField("text", "some test with a relatively long field. " + 
LOREM_IPSUM);
+      doc.addField("collection_t", COLLECTION);
+
+      client.add(COLLECTION, doc);
+
+      doc = new SolrInputDocument();
+      doc.addField("id", "id-" + i);
+      doc.addField("id_i", i);
+      doc.addField("text", "some test with a relatively long field. " + 
LOREM_IPSUM);
+      doc.addField("collection_t", ALT_COLLECTION);
+
+      client.add(ALT_COLLECTION, doc);
+    }
+    client.commit(COLLECTION);
+    client.commit(ALT_COLLECTION);
+    // check that updates to different collections were always sent to the 
same partition
+    Map<Integer, String> partitionsPerCol = new HashMap<>();
+    Map<String, Set<String>> docsPerCol = new HashMap<>();
+    for (ConsumerBatch batch : consumerBatches) {
+      String collection =
+          partitionsPerCol.computeIfAbsent(batch.partitionId, k -> 
batch.collection);
+      docsPerCol.computeIfAbsent(collection, col -> new 
HashSet<>()).addAll(batch.addIds);
+      assertEquals(
+          "request in partition "
+              + batch.partitionId
+              + " has wrong collection "
+              + batch.collection
+              + ": "
+              + batch
+              + "\npartitions: "
+              + partitionsPerCol,
+          collection,
+          batch.collection);
+    }
+    docsPerCol.forEach(
+        (col, ids) -> assertEquals("incorrect count in collection " + col, 
NUM_DOCS, ids.size()));
+  }
+
+  @Test
   public void testStrictOrdering() throws Exception {
     CloudSolrClient client = solrCluster1.getSolrClient();
-    int NUM_DOCS = 5000;
+    int NUM_DOCS = 1000;
     // delay deletes by this many docs
     int DELTA = 100;
     for (int i = 0; i < NUM_DOCS; i++) {
@@ -454,11 +596,12 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     boolean foundUpdates = false;
     for (int i = 0; i < 100; i++) {
       client.commit(collection);
-      results = client.query(collection, new SolrQuery(query));
+      results =
+          client.query(collection, new SolrQuery(CommonParams.Q, query, 
CommonParams.FL, "*"));
       if (results.getResults().getNumFound() == expectedNumDocs) {
         foundUpdates = true;
       } else {
-        Thread.sleep(200);
+        Thread.sleep(300);
       }
     }
 

Reply via email to