This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 623159d3e6c SOLR-17804: re-create SolrClient if closed. (#3411)
623159d3e6c is described below
commit 623159d3e6c81e4b36f297dc7bd0174518076692
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Wed Jul 9 13:40:31 2025 +0200
SOLR-17804: re-create SolrClient if closed. (#3411)
---
.../manager/consumer/KafkaCrossDcConsumer.java | 93 ++++++++++++++++++----
.../messageprocessor/SolrMessageProcessor.java | 16 ++--
.../crossdc/manager/DeleteByQueryToIdTest.java | 2 +-
.../crossdc/manager/SimpleSolrIntegrationTest.java | 2 +-
.../manager/consumer/KafkaCrossDcConsumerTest.java | 32 +++++++-
.../messageprocessor/SolrMessageProcessorTest.java | 2 +-
.../messageprocessor/TestMessageProcessor.java | 2 +-
7 files changed, 119 insertions(+), 30 deletions(-)
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index 6ff3edb727a..bf1ba691f26 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -85,7 +87,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final int maxCollapseRecords;
private final SolrMessageProcessor messageProcessor;
- protected final CloudSolrClient solrClient;
+ protected SolrClientSupplier solrClientSupplier;
private final ThreadPoolExecutor executor;
@@ -95,6 +97,68 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
+ /**
+ * Supplier for creating and managing a working CloudSolrClient instance.
This class ensures that
+ * the CloudSolrClient instance doesn't try to use its {@link
+ * org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider} in a
closed state, which may
+ * happen if e.g. the ZooKeeper connection is lost. When this happens the
CloudSolrClient is
+ * re-created to ensure it can continue to function properly.
+ *
+ * <p>TODO: this functionality should be moved to the CloudSolrClient itself.
+ */
+ public static class SolrClientSupplier implements Supplier<CloudSolrClient>,
AutoCloseable {
+ private final AtomicReference<CloudSolrClient> solrClientRef = new
AtomicReference<>();
+ private final String zkConnectString;
+
+ public SolrClientSupplier(String zkConnectString) {
+ this.zkConnectString = zkConnectString;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(solrClientRef.get());
+ }
+
+ protected CloudSolrClient createSolrClient() {
+ log.debug("Creating new SolrClient...");
+ return new CloudSolrClient.Builder(
+ Collections.singletonList(zkConnectString), Optional.empty())
+ .build();
+ }
+
+ @Override
+ public CloudSolrClient get() {
+ CloudSolrClient existingClient = solrClientRef.get();
+ if (existingClient == null) {
+ synchronized (solrClientRef) {
+ if (solrClientRef.get() == null) {
+ log.info("Initializing Solr client.");
+ solrClientRef.set(createSolrClient());
+ }
+ return solrClientRef.get();
+ }
+ }
+ if (existingClient.getClusterStateProvider().isClosed()) {
+ // lock out other threads and re-open the client if its
ClusterStateProvider was closed
+ synchronized (solrClientRef) {
+ // refresh and check again
+ existingClient = solrClientRef.get();
+ if (existingClient.getClusterStateProvider().isClosed()) {
+ log.info("Re-creating Solr client because its ClusterStateProvider
was closed.");
+ CloudSolrClient newClient = createSolrClient();
+ solrClientRef.set(newClient);
+ IOUtils.closeQuietly(existingClient);
+ return newClient;
+ } else {
+ return existingClient;
+ }
+ }
+ } else {
+ return existingClient;
+ }
+ }
+ }
+
/**
* @param conf The Kafka consumer configuration
* @param startLatch To inform the caller when the Consumer has started
@@ -157,7 +221,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
r -> new Thread(r, "KafkaCrossDcConsumerWorker"));
executor.prestartAllCoreThreads();
- solrClient = createSolrClient(conf);
+ solrClientSupplier = createSolrClientSupplier(conf);
messageProcessor = createSolrMessageProcessor();
@@ -170,8 +234,12 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.info("Created Kafka resubmit producer");
}
+ protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf)
{
+ return new
SolrClientSupplier(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING));
+ }
+
protected SolrMessageProcessor createSolrMessageProcessor() {
- return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+ return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L);
}
public KafkaConsumer<String, MirroredSolrRequest<?>>
createKafkaConsumer(Properties properties) {
@@ -179,6 +247,10 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
properties, new StringDeserializer(), new
MirroredSolrRequestSerializer());
}
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf)
{
+ return new KafkaMirroringSink(conf);
+ }
+
/**
* This is where the magic happens.
*
@@ -219,7 +291,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.warn("Failed to close kafka mirroring sink", e);
}
} finally {
- IOUtils.closeQuietly(solrClient);
+ IOUtils.closeQuietly(solrClientSupplier);
}
}
@@ -506,7 +578,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
offsetCheckExecutor.shutdown();
offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS);
}
- solrClient.close();
+ IOUtils.closeQuietly(solrClientSupplier);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for executor to shutdown");
@@ -516,15 +588,4 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
Util.logMetrics(metrics);
}
}
-
- protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
- return new CloudSolrClient.Builder(
-
Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
- Optional.empty())
- .build();
- }
-
- protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf)
{
- return new KafkaMirroringSink(conf);
- }
}
diff --git
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
index 6824fd876ac..f0ac2b182a2 100644
---
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
+++
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
@@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -59,13 +60,14 @@ public class SolrMessageProcessor extends MessageProcessor
private final MetricRegistry metrics =
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
- final CloudSolrClient client;
+ final Supplier<CloudSolrClient> clientSupplier;
private static final String VERSION_FIELD = "_version_";
- public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy
resubmitBackoffPolicy) {
+ public SolrMessageProcessor(
+ Supplier<CloudSolrClient> clientSupplier, ResubmitBackoffPolicy
resubmitBackoffPolicy) {
super(resubmitBackoffPolicy);
- this.client = client;
+ this.clientSupplier = clientSupplier;
}
@Override
@@ -190,14 +192,14 @@ public class SolrMessageProcessor extends MessageProcessor
if (log.isDebugEnabled()) {
log.debug(
"Sending request to Solr at ZK address={} with params {}",
- ZkStateReader.from(client).getZkClient().getZkServerAddress(),
+
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams());
}
Result<MirroredSolrRequest<?>> result;
SolrResponseBase response;
Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(),
"outputTime")).time();
try {
- response = (SolrResponseBase) request.process(client);
+ response = (SolrResponseBase) request.process(clientSupplier.get());
} finally {
ctx.stop();
}
@@ -216,7 +218,7 @@ public class SolrMessageProcessor extends MessageProcessor
if (log.isDebugEnabled()) {
log.debug(
"Finished sending request to Solr at ZK address={} with params {}
status_code={}",
- ZkStateReader.from(client).getZkClient().getZkServerAddress(),
+
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams(),
status);
}
@@ -352,7 +354,7 @@ public class SolrMessageProcessor extends MessageProcessor
boolean connected = false;
while (!connected) {
try {
- client.connect(); // volatile null-check if already connected
+ clientSupplier.get().connect(); // volatile null-check if already
connected
connected = true;
} catch (Exception e) {
log.error("Unable to connect to solr server. Not consuming.", e);
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
index 475b9f0b73f..95a9f491ab4 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
@@ -164,7 +164,7 @@ public class DeleteByQueryToIdTest extends
SolrCloudTestCase {
return new KafkaCrossDcConsumer(conf, startLatch) {
@Override
protected SolrMessageProcessor createSolrMessageProcessor() {
- return new SolrMessageProcessor(solrClient, resubmitRequest ->
0L) {
+ return new SolrMessageProcessor(solrClientSupplier,
resubmitRequest -> 0L) {
@Override
public Result<MirroredSolrRequest<?>> handleItem(
MirroredSolrRequest<?> mirroredSolrRequest) {
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
index 4c37d159f41..06f5add80c0 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
@@ -51,7 +51,7 @@ public class SimpleSolrIntegrationTest extends
SolrCloudTestCase {
CloudSolrClient cloudClient1 = cluster1.getSolrClient();
- processor = new SolrMessageProcessor(cloudClient1, null);
+ processor = new SolrMessageProcessor(() -> cloudClient1, null);
CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(COLLECTION, 1, 1);
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
index 715483893ed..328a629a9fe 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
@@ -43,12 +43,14 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
@@ -73,6 +75,10 @@ public class KafkaCrossDcConsumerTest {
private KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumerMock;
private CloudSolrClient solrClientMock;
private KafkaMirroringSink kafkaMirroringSinkMock;
+ private ClusterStateProvider clusterStateProviderMock;
+ private KafkaCrossDcConsumer.SolrClientSupplier supplier;
+ private AtomicInteger solrClientCounter = new AtomicInteger(0);
+ private boolean clusterStateProviderIsClosed = false;
private SolrMessageProcessor messageProcessorMock;
@@ -86,10 +92,22 @@ public class KafkaCrossDcConsumerTest {
@Before
public void setUp() {
kafkaConsumerMock = mock(KafkaConsumer.class);
+ clusterStateProviderMock = mock(ClusterStateProvider.class);
+ doAnswer(inv ->
clusterStateProviderIsClosed).when(clusterStateProviderMock).isClosed();
solrClientMock = mock(CloudSolrClient.class);
+
doReturn(clusterStateProviderMock).when(solrClientMock).getClusterStateProvider();
kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
messageProcessorMock = mock(SolrMessageProcessor.class);
conf = testCrossDCConf();
+ supplier =
+ new KafkaCrossDcConsumer.SolrClientSupplier(null) {
+ @Override
+ protected CloudSolrClient createSolrClient() {
+ solrClientCounter.incrementAndGet();
+ return solrClientMock;
+ }
+ };
+
// Set necessary configurations
kafkaCrossDcConsumer =
@@ -106,8 +124,8 @@ public class KafkaCrossDcConsumerTest {
}
@Override
- protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
- return solrClientMock;
+ protected SolrClientSupplier
createSolrClientSupplier(KafkaCrossDcConf conf) {
+ return supplier;
}
@Override
@@ -186,6 +204,15 @@ public class KafkaCrossDcConsumerTest {
assertEquals(1, startLatch.getCount());
}
+ @Test
+ public void testSolrClientSupplier() throws Exception {
+ supplier.get();
+ assertEquals(1, solrClientCounter.get());
+ clusterStateProviderIsClosed = true;
+ supplier.get();
+ assertEquals(2, solrClientCounter.get());
+ }
+
@Test
public void testRunAndShutdown() throws Exception {
// Define the expected behavior of the mocks and set up the test scenario
@@ -219,7 +246,6 @@ public class KafkaCrossDcConsumerTest {
// Verify that the appropriate methods were called on the mocks
verify(kafkaConsumerMock).wakeup();
- verify(solrClientMock).close();
consumerThreadExecutor.shutdown();
consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
index 9cde87ba7d1..2a1b552ab26 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
@@ -55,7 +55,7 @@ public class SolrMessageProcessorTest {
public void setUp() {
client = mock(CloudSolrClient.class);
resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
- solrMessageProcessor = new SolrMessageProcessor(client,
resubmitBackoffPolicy);
+ solrMessageProcessor = new SolrMessageProcessor(() -> client,
resubmitBackoffPolicy);
}
/** Should handle MirroredSolrRequest and return a failed result with no
retry */
diff --git
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
index dedeb2575d5..a3c162ce761 100644
---
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
+++
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
@@ -70,7 +70,7 @@ public class TestMessageProcessor {
public void setUp() {
MockitoAnnotations.initMocks(this);
- processor = Mockito.spy(new SolrMessageProcessor(solrClient,
backoffPolicy));
+ processor = Mockito.spy(new SolrMessageProcessor(() -> solrClient,
backoffPolicy));
Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
}