This is an automated email from the ASF dual-hosted git repository.

sigram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 3aa6aa2085a SOLR-18252: Fix flaky test by avoiding race conditions. 
(#4462)
3aa6aa2085a is described below

commit 3aa6aa2085ac3ec5b90d181a7db7577c57318d4a
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Mon May 25 17:38:58 2026 +0200

    SOLR-18252: Fix flaky test by avoiding race conditions. (#4462)
    
    Co-authored by: hoss.
---
 .../manager/SolrAndKafkaIntegrationTest.java       | 45 +++++++++++++++++++---
 1 file changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 8e8b31d69d2..e42ac3844a3 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -38,9 +38,12 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -70,6 +73,7 @@ import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
@@ -79,6 +83,7 @@ import 
org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
 import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
 import org.apache.solr.crossdc.manager.consumer.PartitionManager;
 import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
+import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -160,7 +165,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
 
   protected volatile Consumer consumer;
 
-  private List<ConsumerBatch> consumerBatches;
+  private BlockingQueue<ConsumerBatch> consumerBatches;
 
   private static final String TOPIC = "topic1";
 
@@ -180,7 +185,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
         (t, e) -> log.error("Uncaught exception in thread {}", t, e));
     System.setProperty("otel.metrics.exporter", "prometheus");
     System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
-    consumerBatches = new ArrayList<>();
+    consumerBatches = new LinkedBlockingQueue<>();
     consumer =
         new Consumer() {
           @Override
@@ -195,7 +200,7 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
                   final MirroredSolrRequest.Type type,
                   final ConsumerRecord<String, MirroredSolrRequest<?>> 
lastRecord,
                   final PartitionManager.WorkUnit workUnit) {
-                consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
+                consumerBatches.offer(new ConsumerBatch(type, solrReqBatch));
                 super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
               }
             };
@@ -357,10 +362,27 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
     }
     client.commit(COLLECTION);
     client.commit(ALT_COLLECTION);
+
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", NUM_DOCS);
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", NUM_DOCS);
+
     // check that updates to different collections were always sent to the 
same partition
     Map<Integer, String> partitionsPerCol = new HashMap<>();
     Map<String, Set<String>> docsPerCol = new HashMap<>();
-    for (ConsumerBatch batch : consumerBatches) {
+    int batchCount = 0;
+    TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, 
TimeSource.CURRENT_TIME);
+    while (!timeOut.hasTimedOut()) {
+      final ConsumerBatch batch = consumerBatches.poll(2, TimeUnit.SECONDS);
+      if (batch == null) {
+        int totalDocsSeen = 
docsPerCol.values().stream().mapToInt(Set::size).sum();
+        if (totalDocsSeen == NUM_DOCS * 2) {
+          // we've collected all expected docs
+          break;
+        } else {
+          continue; // keep waiting, it was just a longer pause
+        }
+      }
+      batchCount++;
       String collection =
           partitionsPerCol.computeIfAbsent(batch.partitionId, k -> 
batch.collection);
       docsPerCol.computeIfAbsent(collection, col -> new 
HashSet<>()).addAll(batch.addIds);
@@ -376,8 +398,19 @@ public class SolrAndKafkaIntegrationTest extends 
SolrCloudTestCase {
           collection,
           batch.collection);
     }
-    docsPerCol.forEach(
-        (col, ids) -> assertEquals("incorrect count in collection " + col, 
NUM_DOCS, ids.size()));
+    if (timeOut.hasTimedOut()) {
+      fail("timed out waiting for batches");
+    }
+    assertTrue("No batches were received from consumer", batchCount > 0);
+    assertEquals("Should have processed both collections", 2, 
docsPerCol.size());
+    assertTrue("COLLECTION not found in results", 
docsPerCol.containsKey(COLLECTION));
+    assertTrue("ALT_COLLECTION not found in results", 
docsPerCol.containsKey(ALT_COLLECTION));
+    assertEquals(
+        "incorrect count in collection " + COLLECTION, NUM_DOCS, 
docsPerCol.get(COLLECTION).size());
+    assertEquals(
+        "incorrect count in collection " + ALT_COLLECTION,
+        NUM_DOCS,
+        docsPerCol.get(ALT_COLLECTION).size());
   }
 
   @Test

Reply via email to