This is an automated email from the ASF dual-hosted git repository.
sigram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 3aa6aa2085a SOLR-18252: Fix flaky test by avoiding race conditions.
(#4462)
3aa6aa2085a is described below
commit 3aa6aa2085ac3ec5b90d181a7db7577c57318d4a
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Mon May 25 17:38:58 2026 +0200
SOLR-18252: Fix flaky test by avoiding race conditions. (#4462)
Co-authored by: hoss.
---
.../manager/SolrAndKafkaIntegrationTest.java | 45 +++++++++++++++++++---
1 file changed, 39 insertions(+), 6 deletions(-)
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
index 8e8b31d69d2..e42ac3844a3 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java
@@ -38,9 +38,12 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -70,6 +73,7 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
@@ -79,6 +83,7 @@ import
org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
import org.apache.solr.crossdc.manager.consumer.PartitionManager;
import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
+import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -160,7 +165,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
protected volatile Consumer consumer;
- private List<ConsumerBatch> consumerBatches;
+ private BlockingQueue<ConsumerBatch> consumerBatches;
private static final String TOPIC = "topic1";
@@ -180,7 +185,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
(t, e) -> log.error("Uncaught exception in thread {}", t, e));
System.setProperty("otel.metrics.exporter", "prometheus");
System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
- consumerBatches = new ArrayList<>();
+ consumerBatches = new LinkedBlockingQueue<>();
consumer =
new Consumer() {
@Override
@@ -195,7 +200,7 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
final MirroredSolrRequest.Type type,
final ConsumerRecord<String, MirroredSolrRequest<?>>
lastRecord,
final PartitionManager.WorkUnit workUnit) {
- consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
+ consumerBatches.offer(new ConsumerBatch(type, solrReqBatch));
super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
}
};
@@ -357,10 +362,27 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
}
client.commit(COLLECTION);
client.commit(ALT_COLLECTION);
+
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", NUM_DOCS);
+ assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", NUM_DOCS);
+
// check that updates to different collections were always sent to the
same partition
Map<Integer, String> partitionsPerCol = new HashMap<>();
Map<String, Set<String>> docsPerCol = new HashMap<>();
- for (ConsumerBatch batch : consumerBatches) {
+ int batchCount = 0;
+ TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS,
TimeSource.CURRENT_TIME);
+ while (!timeOut.hasTimedOut()) {
+ final ConsumerBatch batch = consumerBatches.poll(2, TimeUnit.SECONDS);
+ if (batch == null) {
+ int totalDocsSeen =
docsPerCol.values().stream().mapToInt(Set::size).sum();
+ if (totalDocsSeen == NUM_DOCS * 2) {
+ // we've collected all expected docs
+ break;
+ } else {
+ continue; // keep waiting, it was just a longer pause
+ }
+ }
+ batchCount++;
String collection =
partitionsPerCol.computeIfAbsent(batch.partitionId, k ->
batch.collection);
docsPerCol.computeIfAbsent(collection, col -> new
HashSet<>()).addAll(batch.addIds);
@@ -376,8 +398,19 @@ public class SolrAndKafkaIntegrationTest extends
SolrCloudTestCase {
collection,
batch.collection);
}
- docsPerCol.forEach(
- (col, ids) -> assertEquals("incorrect count in collection " + col,
NUM_DOCS, ids.size()));
+ if (timeOut.hasTimedOut()) {
+ fail("timed out waiting for batches");
+ }
+ assertTrue("No batches were received from consumer", batchCount > 0);
+ assertEquals("Should have processed both collections", 2,
docsPerCol.size());
+ assertTrue("COLLECTION not found in results",
docsPerCol.containsKey(COLLECTION));
+ assertTrue("ALT_COLLECTION not found in results",
docsPerCol.containsKey(ALT_COLLECTION));
+ assertEquals(
+ "incorrect count in collection " + COLLECTION, NUM_DOCS,
docsPerCol.get(COLLECTION).size());
+ assertEquals(
+ "incorrect count in collection " + ALT_COLLECTION,
+ NUM_DOCS,
+ docsPerCol.get(ALT_COLLECTION).size());
}
@Test