This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new d9c7d485b97 SOLR-17804: re-create SolrClient if closed. (#3424)
d9c7d485b97 is described below

commit d9c7d485b97bbbb798388ab82b535bc5eeaed983
Author: Andrzej BiaƂecki <[email protected]>
AuthorDate: Fri Jul 11 10:29:09 2025 +0200

    SOLR-17804: re-create SolrClient if closed. (#3424)
---
 .../manager/consumer/KafkaCrossDcConsumer.java     | 93 ++++++++++++++++++----
 .../messageprocessor/SolrMessageProcessor.java     | 16 ++--
 .../crossdc/manager/DeleteByQueryToIdTest.java     |  2 +-
 .../crossdc/manager/SimpleSolrIntegrationTest.java |  2 +-
 .../manager/consumer/KafkaCrossDcConsumerTest.java | 32 +++++++-
 .../messageprocessor/SolrMessageProcessorTest.java |  2 +-
 .../messageprocessor/TestMessageProcessor.java     |  2 +-
 7 files changed, 119 insertions(+), 30 deletions(-)

diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
index a8b121bad8b..58e5e306fe6 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java
@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -85,7 +87,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   private final int maxCollapseRecords;
   private final SolrMessageProcessor messageProcessor;
 
-  protected final CloudSolrClient solrClient;
+  protected SolrClientSupplier solrClientSupplier;
 
   private final ThreadPoolExecutor executor;
 
@@ -95,6 +97,68 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
 
+  /**
+   * Supplier for creating and managing a working CloudSolrClient instance. 
This class ensures that
+   * the CloudSolrClient instance doesn't try to use its {@link
+   * org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider} in a 
closed state, which may
+   * happen if e.g. the ZooKeeper connection is lost. When this happens the 
CloudSolrClient is
+   * re-created to ensure it can continue to function properly.
+   *
+   * <p>TODO: this functionality should be moved to the CloudSolrClient itself.
+   */
+  public static class SolrClientSupplier implements Supplier<CloudSolrClient>, 
AutoCloseable {
+    private final AtomicReference<CloudSolrClient> solrClientRef = new 
AtomicReference<>();
+    private final String zkConnectString;
+
+    public SolrClientSupplier(String zkConnectString) {
+      this.zkConnectString = zkConnectString;
+    }
+
+    @Override
+    public void close() {
+      IOUtils.closeQuietly(solrClientRef.get());
+    }
+
+    protected CloudSolrClient createSolrClient() {
+      log.debug("Creating new SolrClient...");
+      return new CloudSolrClient.Builder(
+              Collections.singletonList(zkConnectString), Optional.empty())
+          .build();
+    }
+
+    @Override
+    public CloudSolrClient get() {
+      CloudSolrClient existingClient = solrClientRef.get();
+      if (existingClient == null) {
+        synchronized (solrClientRef) {
+          if (solrClientRef.get() == null) {
+            log.info("Initializing Solr client.");
+            solrClientRef.set(createSolrClient());
+          }
+          return solrClientRef.get();
+        }
+      }
+      if (existingClient.getClusterStateProvider().isClosed()) {
+        // lock out other threads and re-open the client if its 
ClusterStateProvider was closed
+        synchronized (solrClientRef) {
+          // refresh and check again
+          existingClient = solrClientRef.get();
+          if (existingClient.getClusterStateProvider().isClosed()) {
+            log.info("Re-creating Solr client because its ClusterStateProvider 
was closed.");
+            CloudSolrClient newClient = createSolrClient();
+            solrClientRef.set(newClient);
+            IOUtils.closeQuietly(existingClient);
+            return newClient;
+          } else {
+            return existingClient;
+          }
+        }
+      } else {
+        return existingClient;
+      }
+    }
+  }
+
   /**
    * @param conf The Kafka consumer configuration
    * @param startLatch To inform the caller when the Consumer has started
@@ -157,7 +221,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
             r -> new Thread(r, "KafkaCrossDcConsumerWorker"));
     executor.prestartAllCoreThreads();
 
-    solrClient = createSolrClient(conf);
+    solrClientSupplier = createSolrClientSupplier(conf);
 
     messageProcessor = createSolrMessageProcessor();
 
@@ -170,8 +234,12 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     log.info("Created Kafka resubmit producer");
   }
 
+  protected SolrClientSupplier createSolrClientSupplier(KafkaCrossDcConf conf) 
{
+    return new 
SolrClientSupplier(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING));
+  }
+
   protected SolrMessageProcessor createSolrMessageProcessor() {
-    return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+    return new SolrMessageProcessor(solrClientSupplier, resubmitRequest -> 0L);
   }
 
   public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(Properties properties) {
@@ -179,6 +247,10 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         properties, new StringDeserializer(), new 
MirroredSolrRequestSerializer());
   }
 
+  protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) 
{
+    return new KafkaMirroringSink(conf);
+  }
+
   /**
    * This is where the magic happens.
    *
@@ -219,7 +291,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         log.warn("Failed to close kafka mirroring sink", e);
       }
     } finally {
-      IOUtils.closeQuietly(solrClient);
+      IOUtils.closeQuietly(solrClientSupplier);
     }
   }
 
@@ -511,7 +583,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         offsetCheckExecutor.shutdown();
         offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS);
       }
-      solrClient.close();
+      IOUtils.closeQuietly(solrClientSupplier);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       log.warn("Interrupted while waiting for executor to shutdown");
@@ -521,15 +593,4 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
       Util.logMetrics(metrics);
     }
   }
-
-  protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
-    return new CloudSolrClient.Builder(
-            
Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
-            Optional.empty())
-        .build();
-  }
-
-  protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) 
{
-    return new KafkaMirroringSink(conf);
-  }
 }
diff --git 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
index b4e14feea78..219287d6606 100644
--- 
a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
+++ 
b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java
@@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -59,13 +60,14 @@ public class SolrMessageProcessor extends MessageProcessor
   private final MetricRegistry metrics =
       SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
 
-  final CloudSolrClient client;
+  final Supplier<CloudSolrClient> clientSupplier;
 
   private static final String VERSION_FIELD = "_version_";
 
-  public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
+  public SolrMessageProcessor(
+      Supplier<CloudSolrClient> clientSupplier, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
     super(resubmitBackoffPolicy);
-    this.client = client;
+    this.clientSupplier = clientSupplier;
   }
 
   @Override
@@ -190,14 +192,14 @@ public class SolrMessageProcessor extends MessageProcessor
     if (log.isDebugEnabled()) {
       log.debug(
           "Sending request to Solr at ZK address={} with params {}",
-          ZkStateReader.from(client).getZkClient().getZkServerAddress(),
+          
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
           request.getParams());
     }
     Result<MirroredSolrRequest<?>> result;
     SolrResponseBase response;
     Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), 
"outputTime")).time();
     try {
-      response = (SolrResponseBase) request.process(client);
+      response = (SolrResponseBase) request.process(clientSupplier.get());
     } finally {
       ctx.stop();
     }
@@ -216,7 +218,7 @@ public class SolrMessageProcessor extends MessageProcessor
     if (log.isDebugEnabled()) {
       log.debug(
           "Finished sending request to Solr at ZK address={} with params {} 
status_code={}",
-          ZkStateReader.from(client).getZkClient().getZkServerAddress(),
+          
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
           request.getParams(),
           status);
     }
@@ -353,7 +355,7 @@ public class SolrMessageProcessor extends MessageProcessor
     boolean connected = false;
     while (!connected) {
       try {
-        client.connect(); // volatile null-check if already connected
+        clientSupplier.get().connect(); // volatile null-check if already 
connected
         connected = true;
       } catch (Exception e) {
         log.error("Unable to connect to solr server. Not consuming.", e);
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
index ff13f34e301..d20489d4eec 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/DeleteByQueryToIdTest.java
@@ -165,7 +165,7 @@ public class DeleteByQueryToIdTest extends 
SolrCloudTestCase {
             return new KafkaCrossDcConsumer(conf, startLatch) {
               @Override
               protected SolrMessageProcessor createSolrMessageProcessor() {
-                return new SolrMessageProcessor(solrClient, resubmitRequest -> 
0L) {
+                return new SolrMessageProcessor(solrClientSupplier, 
resubmitRequest -> 0L) {
                   @Override
                   public Result<MirroredSolrRequest<?>> handleItem(
                       MirroredSolrRequest<?> mirroredSolrRequest) {
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
index 4c37d159f41..06f5add80c0 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java
@@ -51,7 +51,7 @@ public class SimpleSolrIntegrationTest extends 
SolrCloudTestCase {
 
     CloudSolrClient cloudClient1 = cluster1.getSolrClient();
 
-    processor = new SolrMessageProcessor(cloudClient1, null);
+    processor = new SolrMessageProcessor(() -> cloudClient1, null);
 
     CollectionAdminRequest.Create create =
         CollectionAdminRequest.createCollection(COLLECTION, 1, 1);
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
index 5a466d8f5f6..8c160f5125d 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java
@@ -43,12 +43,14 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
@@ -74,6 +76,10 @@ public class KafkaCrossDcConsumerTest {
   private KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumerMock;
   private CloudSolrClient solrClientMock;
   private KafkaMirroringSink kafkaMirroringSinkMock;
+  private ClusterStateProvider clusterStateProviderMock;
+  private KafkaCrossDcConsumer.SolrClientSupplier supplier;
+  private AtomicInteger solrClientCounter = new AtomicInteger(0);
+  private boolean clusterStateProviderIsClosed = false;
 
   private SolrMessageProcessor messageProcessorMock;
 
@@ -87,10 +93,22 @@ public class KafkaCrossDcConsumerTest {
   @Before
   public void setUp() {
     kafkaConsumerMock = mock(KafkaConsumer.class);
+    clusterStateProviderMock = mock(ClusterStateProvider.class);
+    doAnswer(inv -> 
clusterStateProviderIsClosed).when(clusterStateProviderMock).isClosed();
     solrClientMock = mock(CloudSolrClient.class);
+    
doReturn(clusterStateProviderMock).when(solrClientMock).getClusterStateProvider();
     kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
     messageProcessorMock = mock(SolrMessageProcessor.class);
     conf = testCrossDCConf();
+    supplier =
+        new KafkaCrossDcConsumer.SolrClientSupplier(null) {
+          @Override
+          protected CloudSolrClient createSolrClient() {
+            solrClientCounter.incrementAndGet();
+            return solrClientMock;
+          }
+        };
+
     // Set necessary configurations
 
     kafkaCrossDcConsumer =
@@ -107,8 +125,8 @@ public class KafkaCrossDcConsumerTest {
           }
 
           @Override
-          protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
-            return solrClientMock;
+          protected SolrClientSupplier 
createSolrClientSupplier(KafkaCrossDcConf conf) {
+            return supplier;
           }
 
           @Override
@@ -187,6 +205,15 @@ public class KafkaCrossDcConsumerTest {
     assertEquals(1, startLatch.getCount());
   }
 
+  @Test
+  public void testSolrClientSupplier() throws Exception {
+    supplier.get();
+    assertEquals(1, solrClientCounter.get());
+    clusterStateProviderIsClosed = true;
+    supplier.get();
+    assertEquals(2, solrClientCounter.get());
+  }
+
   @Test
   public void testRunAndShutdown() throws Exception {
     // Define the expected behavior of the mocks and set up the test scenario
@@ -220,7 +247,6 @@ public class KafkaCrossDcConsumerTest {
 
     // Verify that the appropriate methods were called on the mocks
     verify(kafkaConsumerMock).wakeup();
-    verify(solrClientMock).close();
 
     consumerThreadExecutor.shutdown();
     consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
index 9cde87ba7d1..2a1b552ab26 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java
@@ -55,7 +55,7 @@ public class SolrMessageProcessorTest {
   public void setUp() {
     client = mock(CloudSolrClient.class);
     resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
-    solrMessageProcessor = new SolrMessageProcessor(client, 
resubmitBackoffPolicy);
+    solrMessageProcessor = new SolrMessageProcessor(() -> client, 
resubmitBackoffPolicy);
   }
 
   /** Should handle MirroredSolrRequest and return a failed result with no 
retry */
diff --git 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
index dedeb2575d5..a3c162ce761 100644
--- 
a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
+++ 
b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/TestMessageProcessor.java
@@ -70,7 +70,7 @@ public class TestMessageProcessor {
   public void setUp() {
     MockitoAnnotations.initMocks(this);
 
-    processor = Mockito.spy(new SolrMessageProcessor(solrClient, 
backoffPolicy));
+    processor = Mockito.spy(new SolrMessageProcessor(() -> solrClient, 
backoffPolicy));
     Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
   }
 

Reply via email to